use crate::multiaddr::Protocol; use crate::rpc::{MetaData, MetaDataV2, MetaDataV3}; use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind}; use crate::{GossipTopic, NetworkConfig}; use futures::future::Either; use gossipsub; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::identity::{Keypair, secp256k1}; use libp2p::metrics::Registry; use libp2p::{PeerId, Transport, core, noise, yamux}; use ssz::Decode; use std::collections::HashSet; use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::sync::Arc; use std::time::Duration; use tracing::{debug, warn}; use types::{ ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId, }; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The filename to store our local metadata. pub const METADATA_FILENAME: &str = "metadata"; pub struct Context<'a> { pub config: Arc, pub enr_fork_id: EnrForkId, pub fork_context: Arc, pub chain_spec: Arc, pub libp2p_registry: Option<&'a mut Registry>, } type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; /// The implementation supports TCP/IP, QUIC (experimental) over UDP, noise as the encryption layer, and /// mplex/yamux as the multiplexing layer (when using TCP). pub fn build_transport( local_private_key: Keypair, quic_support: bool, ) -> std::io::Result { // mplex config let mut mplex_config = libp2p_mplex::Config::new(); mplex_config.set_max_buffer_size(256); mplex_config.set_max_buffer_behaviour(libp2p_mplex::MaxBufferBehaviour::Block); // yamux config let yamux_config = yamux::Config::default(); // Creates the TCP transport layer let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) .upgrade(core::upgrade::Version::V1) .authenticate(generate_noise_config(&local_private_key)) .multiplex(core::upgrade::SelectUpgrade::new( yamux_config, mplex_config, )) .timeout(Duration::from_secs(10)); let transport = if quic_support { // Enables Quic // The default quic configuration suits us for now. let quic_config = libp2p::quic::Config::new(&local_private_key); let quic = libp2p::quic::tokio::Transport::new(quic_config); let transport = tcp .or_transport(quic) .map(|either_output, _| match either_output { Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), }); transport.boxed() } else { tcp.boxed() }; // Enables DNS over the transport. let transport = libp2p::dns::tokio::Transport::system(transport)?.boxed(); Ok(transport) } fn keypair_from_hex(hex_bytes: &str) -> Result { let hex_bytes = if let Some(stripped) = hex_bytes.strip_prefix("0x") { stripped.to_string() } else { hex_bytes.to_string() }; hex::decode(hex_bytes) .map_err(|e| format!("Failed to parse p2p secret key bytes: {:?}", e)) .and_then(keypair_from_bytes) } fn keypair_from_bytes(mut bytes: Vec) -> Result { secp256k1::SecretKey::try_from_bytes(&mut bytes) .map(|secret| { let keypair: secp256k1::Keypair = secret.into(); keypair.into() }) .map_err(|e| format!("Unable to parse p2p secret key: {:?}", e)) } /// Loads a private key from disk. If this fails, a new key is /// generated and is then saved to disk. /// /// Currently only secp256k1 keys are allowed, as these are the only keys supported by discv5. /// Supports both hex format (with or without 0x prefix) and raw bytes format. pub fn load_private_key(config: &NetworkConfig) -> Keypair { // check for key from disk let network_key_f = config.network_dir.join(NETWORK_KEY_FILENAME); if let Ok(mut network_key_file) = File::open(network_key_f.clone()) { // Limit read to reasonable hex key size: 32 bytes = 64 hex chars + "0x" prefix + whitespace let mut buffer = vec![0u8; 70]; match network_key_file.read(&mut buffer) { Ok(bytes_read) => { if let Ok(hex_string) = String::from_utf8(buffer[..bytes_read].to_vec()) { // First try to parse as hex string let hex_content = hex_string.trim(); if let Ok(keypair) = keypair_from_hex(hex_content) { debug!("Loaded network key from disk (hex format)."); return keypair; } } } Err(_) => debug!("Could not read network key file as string, trying binary format"), } // If hex parsing failed or file couldn't be read as string, try binary format if let Ok(mut network_key_file) = File::open(network_key_f.clone()) { let mut key_bytes: Vec = Vec::with_capacity(36); match network_key_file.read_to_end(&mut key_bytes) { Err(_) => debug!("Could not read network key file"), Ok(_) => { // only accept secp256k1 keys for now if let Ok(secret_key) = secp256k1::SecretKey::try_from_bytes(&mut key_bytes) { let kp: secp256k1::Keypair = secret_key.clone().into(); debug!( "Loaded network key from disk (binary format), migrating to hex format." ); // Migrate binary key to hex format let hex_key = hex::encode(secret_key.to_bytes()); if let Err(e) = File::create(network_key_f.clone()) .and_then(|mut f| f.write_all(hex_key.as_bytes())) { debug!("Failed to migrate key to hex format: {}", e); } else { debug!("Successfully migrated key to hex format."); } return kp.into(); } else { debug!("Network key file is not a valid secp256k1 key"); } } } } } // if a key could not be loaded from disk, generate a new one and save it let local_private_key = secp256k1::Keypair::generate(); let _ = std::fs::create_dir_all(&config.network_dir); let hex_key = hex::encode(local_private_key.secret().to_bytes()); match File::create(network_key_f.clone()).and_then(|mut f| f.write_all(hex_key.as_bytes())) { Ok(_) => { debug!("New network key generated and written to disk"); } Err(e) => { warn!( "Could not write node key to file: {:?}. error: {}", network_key_f, e ); } } local_private_key.into() } /// Generate authenticated XX Noise config from identity keys fn generate_noise_config(identity_keypair: &Keypair) -> noise::Config { noise::Config::new(identity_keypair).expect("signing can fail only once during starting a node") } /// For a multiaddr that ends with a peer id, this strips this suffix. Rust-libp2p /// only supports dialing to an address without providing the peer id. pub fn strip_peer_id(addr: &mut Multiaddr) { let last = addr.pop(); match last { Some(Protocol::P2p(_)) => {} Some(other) => addr.push(other), _ => {} } } /// Load metadata from persisted file. Return default metadata if loading fails. pub fn load_or_build_metadata( network_dir: &Path, custody_group_count: u64, ) -> MetaData { // We load a V3 metadata version by default (regardless of current fork) // since a V3 metadata can be converted to V1 or V2. The RPC encoder is responsible // for sending the correct metadata version based on the negotiated protocol version. let mut meta_data = MetaDataV3 { seq_number: 0, attnets: EnrAttestationBitfield::::default(), syncnets: EnrSyncCommitteeBitfield::::default(), custody_group_count, }; // Read metadata from persisted file if available let metadata_path = network_dir.join(METADATA_FILENAME); if let Ok(mut metadata_file) = File::open(metadata_path) { let mut metadata_ssz = Vec::new(); if metadata_file.read_to_end(&mut metadata_ssz).is_ok() { // Attempt to read a MetaDataV3 version from the persisted file, // if that fails, read MetaDataV2 match MetaDataV3::::from_ssz_bytes(&metadata_ssz) { Ok(persisted_metadata) => { meta_data.seq_number = persisted_metadata.seq_number; // Increment seq number if persisted attnet is not default if persisted_metadata.attnets != meta_data.attnets || persisted_metadata.syncnets != meta_data.syncnets || persisted_metadata.custody_group_count != meta_data.custody_group_count { meta_data.seq_number += 1; } debug!("Loaded metadata from disk"); } Err(_) => { match MetaDataV2::::from_ssz_bytes(&metadata_ssz) { Ok(persisted_metadata) => { let persisted_metadata = MetaData::V2(persisted_metadata); // Increment seq number as the persisted metadata version is updated meta_data.seq_number = *persisted_metadata.seq_number() + 1; debug!("Loaded metadata from disk"); } Err(e) => { debug!( error = ?e, "Metadata from file could not be decoded" ); } } } } } }; debug!(seq_num = meta_data.seq_number, "Metadata sequence number"); let meta_data = MetaData::V3(meta_data); save_metadata_to_disk(network_dir, meta_data.clone()); meta_data } /// Creates a whitelist topic filter that covers all possible topics using the given set of /// possible fork digests. pub(crate) fn create_whitelist_filter( possible_fork_digests: Vec<[u8; 4]>, spec: &ChainSpec, sync_committee_subnet_count: u64, ) -> gossipsub::WhitelistSubscriptionFilter { let mut possible_hashes = HashSet::new(); for fork_digest in possible_fork_digests { let mut add = |kind| { let topic: gossipsub::IdentTopic = GossipTopic::new(kind, GossipEncoding::SSZSnappy, fork_digest).into(); possible_hashes.insert(topic.hash()); }; use GossipKind::*; add(BeaconBlock); add(BeaconAggregateAndProof); add(VoluntaryExit); add(ProposerSlashing); add(AttesterSlashing); add(SignedContributionAndProof); add(BlsToExecutionChange); add(LightClientFinalityUpdate); add(LightClientOptimisticUpdate); for id in 0..spec.attestation_subnet_count { add(Attestation(SubnetId::new(id))); } for id in 0..sync_committee_subnet_count { add(SyncCommitteeMessage(SyncSubnetId::new(id))); } let blob_subnet_count = spec.blob_sidecar_subnet_count_max(); for id in 0..blob_subnet_count { add(BlobSidecar(id)); } for id in 0..spec.data_column_sidecar_subnet_count { add(DataColumnSidecar(DataColumnSubnetId::new(id))); } } gossipsub::WhitelistSubscriptionFilter(possible_hashes) } /// Persist metadata to disk pub(crate) fn save_metadata_to_disk(dir: &Path, metadata: MetaData) { let _ = std::fs::create_dir_all(dir); // We always store the metadata v2 to disk because // custody_group_count parameter doesn't need to be persisted across runs. // custody_group_count is what the user sets it for the current run. // This is to prevent ugly branching logic when reading the metadata from disk. let metadata_bytes = metadata.metadata_v2().as_ssz_bytes(); match File::create(dir.join(METADATA_FILENAME)).and_then(|mut f| f.write_all(&metadata_bytes)) { Ok(_) => { debug!("Metadata written to disk"); } Err(e) => { warn!( file = format!("{:?}{:?}", dir, METADATA_FILENAME), error = %e, "Could not write metadata to disk" ); } } }