diff --git a/README.md b/README.md index 1f71315f50..85290f6c86 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,6 @@ If you'd like some background on Sigma Prime, please see the [Lighthouse Update - [`validator_client/`](validator_client/): the "Validator Client" binary and crates exclusively associated with it. - ## Contributing **Lighthouse welcomes contributors.** diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 48504d89ad..b3c687eef2 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -13,3 +13,4 @@ slog-async = "^2.3.0" validator_client = { path = "../validator_client" } types = { path = "../eth2/types" } eth2_config = { path = "../eth2/utils/eth2_config" } +dirs = "2.0.1" diff --git a/account_manager/src/main.rs b/account_manager/src/main.rs index 1c8cc8819a..68c304ec53 100644 --- a/account_manager/src/main.rs +++ b/account_manager/src/main.rs @@ -1,12 +1,12 @@ use bls::Keypair; use clap::{App, Arg, SubCommand}; -use eth2_config::get_data_dir; use slog::{crit, debug, info, o, Drain}; +use std::fs; use std::path::PathBuf; use types::test_utils::generate_deterministic_keypair; use validator_client::Config as ValidatorClientConfig; -pub const DEFAULT_DATA_DIR: &str = ".lighthouse-account-manager"; +pub const DEFAULT_DATA_DIR: &str = ".lighthouse-validator"; pub const CLIENT_CONFIG_FILENAME: &str = "account-manager.toml"; fn main() { @@ -61,14 +61,34 @@ fn main() { ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(log, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let mut client_config = ValidatorClientConfig::default(); if let Err(e) = client_config.apply_cli_args(&matches) { diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 309f162e55..9e96f8484b 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -22,3 +22,5 @@ tokio-timer = "0.2.10" futures = "0.1.25" exit-future = "0.1.3" state_processing = { path = "../eth2/state_processing" } +env_logger = "0.6.1" +dirs = "2.0.1" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index abf1bc6472..793ce79cdd 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -20,9 +20,12 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" slot_clock = { path = "../../eth2/utils/slot_clock" } -ssz = { path = "../../eth2/utils/ssz" } -ssz_derive = { path = "../../eth2/utils/ssz_derive" } +eth2_ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" } state_processing = { path = "../../eth2/state_processing" } tree_hash = { path = "../../eth2/utils/tree_hash" } types = { path = "../../eth2/types" } lmd_ghost = { path = "../../eth2/lmd_ghost" } + +[dev-dependencies] +rand = "0.5.5" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0137a0746b..2d82822701 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,7 +6,7 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use lmd_ghost::LmdGhost; use log::trace; use operation_pool::DepositInsertStatus; -use operation_pool::OperationPool; +use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{RwLock, RwLockReadGuard}; use slot_clock::SlotClock; use state_processing::per_block_processing::errors::{ @@ -147,11 +147,13 @@ impl BeaconChain { let last_finalized_root = p.canonical_head.beacon_state.finalized_root; let last_finalized_block = &p.canonical_head.beacon_block; + let op_pool = p.op_pool.into_operation_pool(&p.state, &spec); + Ok(Some(BeaconChain { spec, slot_clock, fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root), - op_pool: OperationPool::default(), + op_pool, canonical_head: RwLock::new(p.canonical_head), state: RwLock::new(p.state), genesis_block_root: p.genesis_block_root, @@ -164,6 +166,7 @@ impl BeaconChain { pub fn persist(&self) -> Result<(), Error> { let p: PersistedBeaconChain = PersistedBeaconChain { canonical_head: self.canonical_head.read().clone(), + op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), genesis_block_root: self.genesis_block_root, state: self.state.read().clone(), }; @@ -506,8 +509,7 @@ impl BeaconChain { &self, deposit: Deposit, ) -> Result { - self.op_pool - .insert_deposit(deposit, &*self.state.read(), &self.spec) + self.op_pool.insert_deposit(deposit) } /// Accept some exit and queue it for inclusion in an appropriate block. diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index f5bdfdee15..479e1cd8e9 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,4 +1,5 @@ use crate::{BeaconChainTypes, CheckPoint}; +use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; @@ -10,7 +11,7 @@ pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; #[derive(Encode, Decode)] pub struct PersistedBeaconChain { pub canonical_head: CheckPoint, - // TODO: operations pool. + pub op_pool: PersistedOperationPool, pub genesis_block_root: Hash256, pub state: BeaconState, } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 164857e5d8..9b3f7c1cb6 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -14,6 +14,8 @@ use types::{ Hash256, Keypair, RelativeEpoch, SecretKey, Signature, Slot, }; +pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; + /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] pub enum BlockStrategy { @@ -68,8 +70,8 @@ where E: EthSpec, { pub chain: BeaconChain>, - keypairs: Vec, - spec: ChainSpec, + pub keypairs: Vec, + pub spec: ChainSpec, } impl BeaconChainHarness diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 17e373ad6a..882d9f2355 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -1,16 +1,21 @@ #![cfg(not(debug_assertions))] -use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, CommonTypes, PersistedBeaconChain, + BEACON_CHAIN_DB_KEY, +}; use lmd_ghost::ThreadSafeReducedTree; -use store::MemoryStore; -use types::{EthSpec, MinimalEthSpec, Slot}; +use rand::Rng; +use store::{MemoryStore, Store}; +use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; +use types::{Deposit, EthSpec, Hash256, MinimalEthSpec, Slot}; // Should ideally be divisible by 3. pub const VALIDATOR_COUNT: usize = 24; -fn get_harness( - validator_count: usize, -) -> BeaconChainHarness, MinimalEthSpec> { +type TestForkChoice = ThreadSafeReducedTree; + +fn get_harness(validator_count: usize) -> BeaconChainHarness { let harness = BeaconChainHarness::new(validator_count); // Move past the zero slot. @@ -225,3 +230,38 @@ fn does_not_finalize_without_attestation() { "no epoch should have been finalized" ); } + +#[test] +fn roundtrip_operation_pool() { + let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; + + let harness = get_harness(VALIDATOR_COUNT); + + // Add some attestations + harness.extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + assert!(harness.chain.op_pool.num_attestations() > 0); + + // Add some deposits + let rng = &mut XorShiftRng::from_seed([66; 16]); + for _ in 0..rng.gen_range(1, VALIDATOR_COUNT) { + harness + .chain + .process_deposit(Deposit::random_for_test(rng)) + .unwrap(); + } + + // TODO: could add some other operations + harness.chain.persist().unwrap(); + + let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); + let p: PersistedBeaconChain> = + harness.chain.store.get(&key).unwrap().unwrap(); + + let restored_op_pool = p.op_pool.into_operation_pool(&p.state, &harness.spec); + + assert_eq!(harness.chain.op_pool, restored_op_pool); +} diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index f97302a7c0..94a529ea70 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -9,17 +9,20 @@ beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } store = { path = "../store" } http_server = { path = "../http_server" } +eth2-libp2p = { path = "../eth2-libp2p" } rpc = { path = "../rpc" } prometheus = "^0.6" types = { path = "../../eth2/types" } tree_hash = { path = "../../eth2/utils/tree_hash" } eth2_config = { path = "../../eth2/utils/eth2_config" } slot_clock = { path = "../../eth2/utils/slot_clock" } -serde = "1.0" +serde = "1.0.93" serde_derive = "1.0" error-chain = "0.12.0" -slog = "^2.2.3" -ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz = { path = "../../eth2/utils/ssz" } +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog-term = "^2.4.0" +slog-async = "^2.3.0" tokio = "0.1.15" clap = "2.32.0" dirs = "1.0.3" diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/config.rs similarity index 92% rename from beacon_node/client/src/client_config.rs rename to beacon_node/client/src/config.rs index 166725b61c..415ef0ec98 100644 --- a/beacon_node/client/src/client_config.rs +++ b/beacon_node/client/src/config.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; /// The core configuration of a Lighthouse beacon node. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ClientConfig { +pub struct Config { pub data_dir: PathBuf, pub db_type: String, db_name: String, @@ -16,7 +16,7 @@ pub struct ClientConfig { pub http: HttpServerConfig, } -impl Default for ClientConfig { +impl Default for Config { fn default() -> Self { Self { data_dir: PathBuf::from(".lighthouse"), @@ -24,14 +24,14 @@ impl Default for ClientConfig { db_name: "chain_db".to_string(), // Note: there are no default bootnodes specified. // Once bootnodes are established, add them here. - network: NetworkConfig::new(vec![]), + network: NetworkConfig::new(), rpc: rpc::RPCConfig::default(), http: HttpServerConfig::default(), } } } -impl ClientConfig { +impl Config { /// Returns the path to which the client may initialize an on-disk database. pub fn db_path(&self) -> Option { self.data_dir() @@ -49,7 +49,7 @@ impl ClientConfig { /// /// Returns an error if arguments are obviously invalid. May succeed even if some values are /// invalid. - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { + pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> { if let Some(dir) = args.value_of("datadir") { self.data_dir = PathBuf::from(dir); }; diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 18ddef7bb3..7eee8ac0a0 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,7 +1,7 @@ extern crate slog; mod beacon_chain_types; -mod client_config; +mod config; pub mod error; pub mod notifier; @@ -21,7 +21,7 @@ use tokio::timer::Interval; pub use beacon_chain::BeaconChainTypes; pub use beacon_chain_types::ClientType; pub use beacon_chain_types::InitialiseBeaconChain; -pub use client_config::ClientConfig; +pub use config::Config as ClientConfig; pub use eth2_config::Eth2Config; /// Main beacon node client service. This provides the connection and initialisation of the clients diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index cc6393e388..1fbd308725 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,15 +7,18 @@ edition = "2018" [dependencies] beacon_chain = { path = "../beacon_chain" } clap = "2.32.0" -# SigP repository until PR is merged -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" } +#SigP repository +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" -ssz = { path = "../../eth2/utils/ssz" } -ssz_derive = { path = "../../eth2/utils/ssz_derive" } -slog = "2.4.1" +eth2_ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" } +slog = { version = "^2.4.1" , features = ["max_level_trace", "release_max_level_trace"] } version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" error-chain = "0.12.0" +tokio-timer = "0.2.10" +dirs = "2.0.1" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 10b140c3b5..4e4cf24f3c 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,45 +1,72 @@ +use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, Rpc}; -use crate::NetworkConfig; +use crate::{error, NetworkConfig}; +use crate::{Topic, TopicHash}; use futures::prelude::*; use libp2p::{ core::{ + identity::Keypair, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - PublicKey, }, + discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, - identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, - ping::{Ping, PingEvent}, + ping::{Ping, PingConfig, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace, warn}; +use slog::{o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use std::num::NonZeroU32; +use std::time::Duration; use types::{Attestation, BeaconBlock}; -use types::{Topic, TopicHash}; -/// Builds the network behaviour for the libp2p Swarm. -/// Implements gossipsub message routing. +/// Builds the network behaviour that manages the core protocols of eth2. +/// This core behaviour is managed by `Behaviour` which adds peer management to all core +/// behaviours. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - // TODO: Add Kademlia for peer discovery - /// The events generated by this behaviour to be consumed in the swarm poll. + /// The serenity RPC specified in the wire-0 protocol. serenity_rpc: Rpc, - /// Allows discovery of IP addresses for peers on the network. - identify: Identify, /// Keep regular connection to peers and disconnect if absent. - // TODO: Keepalive, likely remove this later. - // TODO: Make the ping time customizeable. ping: Ping, + /// Kademlia for peer discovery. + discovery: Discovery, #[behaviour(ignore)] + /// The events generated by this behaviour to be consumed in the swarm poll. events: Vec, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, } +impl Behaviour { + pub fn new( + local_key: &Keypair, + net_conf: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { + let local_peer_id = local_key.public().clone().into_peer_id(); + let behaviour_log = log.new(o!()); + let ping_config = PingConfig::new() + .with_timeout(Duration::from_secs(30)) + .with_interval(Duration::from_secs(20)) + .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) + .with_keep_alive(false); + + Ok(Behaviour { + serenity_rpc: Rpc::new(log), + gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), + discovery: Discovery::new(local_key, net_conf, log)?, + ping: Ping::new(ping_config), + events: Vec::new(), + log: behaviour_log, + }) + } +} + // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour impl NetworkBehaviourEventProcess for Behaviour @@ -89,30 +116,6 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour -{ - fn inject_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Identified { - peer_id, mut info, .. - } => { - if info.listen_addrs.len() > 20 { - debug!( - self.log, - "More than 20 peers have been identified, truncating" - ); - info.listen_addrs.truncate(20); - } - self.events - .push(BehaviourEvent::Identified(peer_id, Box::new(info))); - } - IdentifyEvent::Error { .. } => {} - IdentifyEvent::SendBack { .. } => {} - } - } -} - impl NetworkBehaviourEventProcess for Behaviour { @@ -122,25 +125,6 @@ impl NetworkBehaviourEventProcess } impl Behaviour { - pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { - let local_peer_id = local_public_key.clone().into_peer_id(); - let identify_config = net_conf.identify_config.clone(); - let behaviour_log = log.new(o!()); - - Behaviour { - gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - serenity_rpc: Rpc::new(log), - identify: Identify::new( - identify_config.version, - identify_config.user_agent, - local_public_key, - ), - ping: Ping::new(), - events: Vec::new(), - log: behaviour_log, - } - } - /// Consumes the events list when polled. fn poll( &mut self, @@ -153,18 +137,23 @@ impl Behaviour { } } +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, _event: Discv5Event) { + // discv5 has no events to inject + } +} + /// Implements the combined behaviour for the libp2p service. impl Behaviour { + /* Pubsub behaviour functions */ + /// Subscribes to a gossipsub topic. pub fn subscribe(&mut self, topic: Topic) -> bool { self.gossipsub.subscribe(topic) } - /// Sends an RPC Request/Response via the RPC protocol. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.serenity_rpc.send_rpc(peer_id, rpc_event); - } - /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); @@ -172,14 +161,19 @@ impl Behaviour { self.gossipsub.publish(topic, message_bytes.clone()); } } + + /* Eth2 RPC behaviour functions */ + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); + } } /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), - Identified(PeerId, Box), - // TODO: This is a stub at the moment GossipMessage { source: PeerId, topics: Vec, diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index ee2add75eb..ea87075b78 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,89 +1,129 @@ use clap::ArgMatches; +use enr::Enr; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; use serde_derive::{Deserialize, Serialize}; -use types::multiaddr::{Error as MultiaddrError, Multiaddr}; +use std::path::PathBuf; +use std::time::Duration; + +/// The beacon node topic string to subscribe to. +pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; +pub const SHARD_TOPIC_PREFIX: &str = "shard"; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. pub struct Config { + /// Data directory where node's keyfile is stored + pub network_dir: PathBuf, + /// IP address to listen on. - listen_addresses: Vec, + pub listen_address: std::net::IpAddr, + + /// The TCP port that libp2p listens on. + pub libp2p_port: u16, + + /// The address to broadcast to peers about which address we are listening on. + pub discovery_address: std::net::IpAddr, + + /// UDP port that discovery listens on. + pub discovery_port: u16, + + /// Target number of connected peers. + pub max_peers: usize, + /// Gossipsub configuration parameters. #[serde(skip)] pub gs_config: GossipsubConfig, - /// Configuration parameters for node identification protocol. - #[serde(skip)] - pub identify_config: IdentifyConfig, + /// List of nodes to initially connect to. - boot_nodes: Vec, + pub boot_nodes: Vec, + /// Client version pub client_version: String, - /// List of topics to subscribe to as strings + + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } impl Default for Config { /// Generate a default network configuration. fn default() -> Self { + let mut network_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); + network_dir.push(".lighthouse"); + network_dir.push("network"); Config { - listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()], + network_dir, + listen_address: "127.0.0.1".parse().expect("vaild ip address"), + libp2p_port: 9000, + discovery_address: "127.0.0.1".parse().expect("valid ip address"), + discovery_port: 9000, + max_peers: 10, + //TODO: Set realistic values for production gs_config: GossipsubConfigBuilder::new() .max_gossip_size(4_000_000) + .inactivity_timeout(Duration::from_secs(90)) + .heartbeat_interval(Duration::from_secs(20)) .build(), - identify_config: IdentifyConfig::default(), boot_nodes: vec![], client_version: version::version(), - topics: vec![String::from("beacon_chain")], + topics: Vec::new(), } } } +/// Generates a default Config. impl Config { - pub fn new(boot_nodes: Vec) -> Self { - let mut conf = Config::default(); - conf.boot_nodes = boot_nodes; - - conf + pub fn new() -> Self { + Config::default() } - pub fn listen_addresses(&self) -> Result, MultiaddrError> { - self.listen_addresses.iter().map(|s| s.parse()).collect() - } + pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> { + if let Some(dir) = args.value_of("datadir") { + self.network_dir = PathBuf::from(dir).join("network"); + }; - pub fn boot_nodes(&self) -> Result, MultiaddrError> { - self.boot_nodes.iter().map(|s| s.parse()).collect() - } - - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { if let Some(listen_address_str) = args.value_of("listen-address") { - let listen_addresses = listen_address_str.split(',').map(Into::into).collect(); - self.listen_addresses = listen_addresses; + let listen_address = listen_address_str + .parse() + .map_err(|_| format!("Invalid listen address: {:?}", listen_address_str))?; + self.listen_address = listen_address; + self.discovery_address = listen_address; } - if let Some(boot_addresses_str) = args.value_of("boot-nodes") { - let boot_addresses = boot_addresses_str.split(',').map(Into::into).collect(); - self.boot_nodes = boot_addresses; + if let Some(max_peers_str) = args.value_of("maxpeers") { + self.max_peers = max_peers_str + .parse::() + .map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; + } + + if let Some(port_str) = args.value_of("port") { + let port = port_str + .parse::() + .map_err(|_| format!("Invalid port: {}", port_str))?; + self.libp2p_port = port; + self.discovery_port = port; + } + + if let Some(boot_enr_str) = args.value_of("boot-nodes") { + self.boot_nodes = boot_enr_str + .split(',') + .map(|enr| enr.parse().map_err(|_| format!("Invalid ENR: {}", enr))) + .collect::, _>>()?; + } + + if let Some(discovery_address_str) = args.value_of("discovery-address") { + self.discovery_address = discovery_address_str + .parse() + .map_err(|_| format!("Invalid discovery address: {:?}", discovery_address_str))? + } + + if let Some(disc_port_str) = args.value_of("disc-port") { + self.discovery_port = disc_port_str + .parse::() + .map_err(|_| format!("Invalid discovery port: {}", disc_port_str))?; } Ok(()) } } - -/// The configuration parameters for the Identify protocol -#[derive(Debug, Clone)] -pub struct IdentifyConfig { - /// The protocol version to listen on. - pub version: String, - /// The client's name and version for identification. - pub user_agent: String, -} - -impl Default for IdentifyConfig { - fn default() -> Self { - Self { - version: "/eth/serenity/1.0".to_string(), - user_agent: version::version(), - } - } -} diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs new file mode 100644 index 0000000000..44b4e655b0 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -0,0 +1,313 @@ +use crate::{error, NetworkConfig}; +/// This manages the discovery and management of peers. +/// +/// Currently using discv5 for peer discovery. +/// +use futures::prelude::*; +use libp2p::core::swarm::{ + ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::discv5::{Discv5, Discv5Event}; +use libp2p::enr::{Enr, EnrBuilder, NodeId}; +use libp2p::multiaddr::Protocol; +use slog::{debug, info, o, warn}; +use std::collections::HashSet; +use std::fs::File; +use std::io::prelude::*; +use std::str::FromStr; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; + +/// Maximum seconds before searching for extra peers. +const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60; +/// Initial delay between peer searches. +const INITIAL_SEARCH_DELAY: u64 = 5; +/// Local ENR storage filename. +const ENR_FILENAME: &str = "enr.dat"; + +/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 +/// libp2p protocol. +pub struct Discovery { + /// The peers currently connected to libp2p streams. + connected_peers: HashSet, + + /// The target number of connected peers on the libp2p interface. + max_peers: usize, + + /// The delay between peer discovery searches. + peer_discovery_delay: Delay, + + /// Tracks the last discovery delay. The delay is doubled each round until the max + /// time is reached. + past_discovery_delay: u64, + + /// The TCP port for libp2p. Used to convert an updated IP address to a multiaddr. Note: This + /// assumes that the external TCP port is the same as the internal TCP port if behind a NAT. + //TODO: Improve NAT handling limit the above restriction + tcp_port: u16, + + /// The discovery behaviour used to discover new peers. + discovery: Discv5, + + /// Logger for the discovery behaviour. + log: slog::Logger, +} + +impl Discovery { + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { + let log = log.new(o!("Service" => "Libp2p-Discovery")); + + // checks if current ENR matches that found on disk + let local_enr = load_enr(local_key, config, &log)?; + + info!(log, "Local ENR: {}", local_enr.to_base64()); + debug!(log, "Local Node Id: {}", local_enr.node_id()); + + let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address) + .map_err(|e| format!("Discv5 service failed: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in config.boot_nodes.clone() { + debug!( + log, + "Adding node to routing table: {}", + bootnode_enr.node_id() + ); + discovery.add_enr(bootnode_enr); + } + + Ok(Self { + connected_peers: HashSet::new(), + max_peers: config.max_peers, + peer_discovery_delay: Delay::new(Instant::now()), + past_discovery_delay: INITIAL_SEARCH_DELAY, + tcp_port: config.libp2p_port, + discovery, + log, + }) + } + + /// Manually search for peers. This restarts the discovery round, sparking multiple rapid + /// queries. + pub fn discover_peers(&mut self) { + self.past_discovery_delay = INITIAL_SEARCH_DELAY; + self.find_peers(); + } + + /// Add an Enr to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + self.discovery.add_enr(enr); + } + + /// Search for new peers using the underlying discovery mechanism. + fn find_peers(&mut self) { + // pick a random NodeId + let random_node = NodeId::random(); + debug!(self.log, "Searching for peers..."); + self.discovery.find_node(random_node); + + // update the time until next discovery + let delay = { + if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { + self.past_discovery_delay *= 2; + self.past_discovery_delay + } else { + MAX_TIME_BETWEEN_PEER_SEARCHES + } + }; + self.peer_discovery_delay + .reset(Instant::now() + Duration::from_secs(delay)); + } +} + +// Redirect all behaviour events to underlying discovery behaviour. +impl NetworkBehaviour for Discovery +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = as NetworkBehaviour>::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + NetworkBehaviour::new_handler(&mut self.discovery) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + // Let discovery track possible known peers. + self.discovery.addresses_of_peer(peer_id) + } + + fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.insert(peer_id); + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.remove(peer_id); + } + + fn inject_replaced( + &mut self, + _peer_id: PeerId, + _closed: ConnectedPoint, + _opened: ConnectedPoint, + ) { + // discv5 doesn't implement + } + + fn inject_node_event( + &mut self, + _peer_id: PeerId, + _event: ::OutEvent, + ) { + // discv5 doesn't implement + } + + fn poll( + &mut self, + params: &mut impl PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + // search for peers if it is time + loop { + match self.peer_discovery_delay.poll() { + Ok(Async::Ready(_)) => { + if self.connected_peers.len() < self.max_peers { + self.find_peers(); + } + } + Ok(Async::NotReady) => break, + Err(e) => { + warn!(self.log, "Discovery peer search failed: {:?}", e); + } + } + } + + // Poll discovery + loop { + match self.discovery.poll(params) { + Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match event { + Discv5Event::Discovered(_enr) => { + // not concerned about FINDNODE results, rather the result of an entire + // query. + } + Discv5Event::SocketUpdated(socket) => { + info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip())); + let mut address = Multiaddr::from(socket.ip()); + address.push(Protocol::Tcp(self.tcp_port)); + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + }); + } + Discv5Event::FindNodeResult { closer_peers, .. } => { + debug!(self.log, "Discv5 query found {} peers", closer_peers.len()); + if closer_peers.is_empty() { + debug!(self.log, "Discv5 random query yielded empty results"); + } + for peer_id in closer_peers { + // if we need more peers, attempt a connection + if self.connected_peers.len() < self.max_peers + && self.connected_peers.get(&peer_id).is_none() + { + debug!(self.log, "Discv5: Peer discovered"; "Peer"=> format!("{:?}", peer_id)); + return Async::Ready(NetworkBehaviourAction::DialPeer { + peer_id, + }); + } + } + } + _ => {} + } + } + // discv5 does not output any other NetworkBehaviourAction + Async::Ready(_) => {} + Async::NotReady => break, + } + } + Async::NotReady + } +} + +/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none +/// exists, generates a new one. +/// +/// If an ENR exists, with the same NodeId and IP address, we use the disk-generated one as its +/// ENR sequence will be equal or higher than a newly generated one. +fn load_enr( + local_key: &Keypair, + config: &NetworkConfig, + log: &slog::Logger, +) -> Result { + // Build the local ENR. + // Note: Discovery should update the ENR record's IP to the external IP as seen by the + // majority of our peers. + let mut local_enr = EnrBuilder::new() + .ip(config.discovery_address.into()) + .tcp(config.libp2p_port) + .udp(config.discovery_port) + .build(&local_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e))?; + + let enr_f = config.network_dir.join(ENR_FILENAME); + if let Ok(mut enr_file) = File::open(enr_f.clone()) { + let mut enr_string = String::new(); + match enr_file.read_to_string(&mut enr_string) { + Err(_) => debug!(log, "Could not read ENR from file"), + Ok(_) => { + match Enr::from_str(&enr_string) { + Ok(enr) => { + debug!(log, "ENR found in file: {:?}", enr_f); + + if enr.node_id() == local_enr.node_id() { + if enr.ip() == config.discovery_address.into() + && enr.tcp() == Some(config.libp2p_port) + && enr.udp() == Some(config.discovery_port) + { + debug!(log, "ENR loaded from file"); + // the stored ENR has the same configuration, use it + return Ok(enr); + } + + // same node id, different configuration - update the sequence number + let new_seq_no = enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?; + local_enr.set_seq(new_seq_no, local_key).map_err(|e| { + format!("Could not update ENR sequence number: {:?}", e) + })?; + debug!(log, "ENR sequence number increased to: {}", new_seq_no); + } + } + Err(e) => { + warn!(log, "ENR from file could not be decoded: {:?}", e); + } + } + } + } + } + + // write ENR to disk + let _ = std::fs::create_dir_all(&config.network_dir); + match File::create(enr_f.clone()) + .and_then(|mut f| f.write_all(&local_enr.to_base64().as_bytes())) + { + Ok(_) => { + debug!(log, "ENR written to disk"); + } + Err(e) => { + warn!( + log, + "Could not write ENR to file: {:?}. Error: {}", enr_f, e + ); + } + } + Ok(local_enr) +} diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 659d6b01c1..7a3b2e632d 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -4,12 +4,18 @@ /// This crate builds and manages the libp2p services required by the beacon node. pub mod behaviour; mod config; +mod discovery; pub mod error; pub mod rpc; mod service; pub use behaviour::PubsubMessage; -pub use config::Config as NetworkConfig; +pub use config::{ + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, +}; +pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; +pub use libp2p::multiaddr; +pub use libp2p::Multiaddr; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, @@ -17,5 +23,3 @@ pub use libp2p::{ pub use rpc::RPCEvent; pub use service::Libp2pEvent; pub use service::Service; -pub use types::multiaddr; -pub use types::Multiaddr; diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 57d7dadbe5..2d303469c6 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -94,7 +94,7 @@ where fn poll( &mut self, - _: &mut PollParameters<'_>, + _: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 2f461988ac..7afded3ac7 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -11,7 +11,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; const MAX_READ_SIZE: usize = 4_194_304; // 4M /// Implementation of the `ConnectionUpgrade` for the rpc protocol. - #[derive(Debug, Clone)] pub struct RPCProtocol; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 18f7ca98ce..69f8a1ca57 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,25 +3,30 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; +use crate::{TopicBuilder, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ - identity, + identity::Keypair, + multiaddr::Multiaddr, muxing::StreamMuxerBox, nodes::Substream, transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, }; -use libp2p::identify::protocol::IdentifyInfo; use libp2p::{core, secio, PeerId, Swarm, Transport}; use slog::{debug, info, trace, warn}; +use std::fs::File; +use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::{TopicBuilder, TopicHash}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = Behaviour>; +const NETWORK_KEY_FILENAME: &str = "key"; + /// The configuration and state of the libp2p components for the beacon node. pub struct Service { /// The libp2p Swarm handler. @@ -35,59 +40,52 @@ pub struct Service { impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { - debug!(log, "Libp2p Service starting"); + debug!(log, "Network-libp2p Service starting"); - // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this - // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 - // TODO: Save and recover node key from disk - let local_private_key = identity::Keypair::generate_secp256k1(); + // load the private key from CLI flag, disk or generate a new one + let local_private_key = load_private_key(&config, &log); - let local_public_key = local_private_key.public(); let local_peer_id = PeerId::from(local_private_key.public()); info!(log, "Local peer id: {:?}", local_peer_id); let mut swarm = { - // Set up the transport - let transport = build_transport(local_private_key); - // Set up gossipsub routing - let behaviour = Behaviour::new(local_public_key.clone(), &config, &log); - // Set up Topology - let topology = local_peer_id.clone(); - Swarm::new(transport, behaviour, topology) + // Set up the transport - tcp/ws with secio and mplex/yamux + let transport = build_transport(local_private_key.clone()); + // Lighthouse network behaviour + let behaviour = Behaviour::new(&local_private_key, &config, &log)?; + Swarm::new(transport, behaviour, local_peer_id.clone()) }; - // listen on all addresses - for address in config - .listen_addresses() - .map_err(|e| format!("Invalid listen multiaddr: {}", e))? - { - match Swarm::listen_on(&mut swarm, address.clone()) { - Ok(mut listen_addr) => { - listen_addr.append(Protocol::P2p(local_peer_id.clone().into())); - info!(log, "Listening on: {}", listen_addr); - } - Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err), - }; - } - // connect to boot nodes - these are currently stored as multiaddrs - // Once we have discovery, can set to peerId - for bootnode in config - .boot_nodes() - .map_err(|e| format!("Invalid boot node multiaddr: {:?}", e))? - { - match Swarm::dial_addr(&mut swarm, bootnode.clone()) { - Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode), - Err(err) => debug!( - log, - "Could not connect to bootnode: {} error: {:?}", bootnode, err - ), - }; - } + // listen on the specified address + let listen_multiaddr = { + let mut m = Multiaddr::from(config.listen_address); + m.push(Protocol::Tcp(config.libp2p_port)); + m + }; + + match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { + Ok(_) => { + let mut log_address = listen_multiaddr; + log_address.push(Protocol::P2p(local_peer_id.clone().into())); + info!(log, "Listening on: {}", log_address); + } + Err(err) => warn!( + log, + "Cannot listen on: {} because: {:?}", listen_multiaddr, err + ), + }; // subscribe to default gossipsub topics + let mut topics = vec![]; + //TODO: Handle multiple shard attestations. For now we simply use a separate topic for + //attestations + topics.push(BEACON_ATTESTATION_TOPIC.to_string()); + topics.push(BEACON_PUBSUB_TOPIC.to_string()); + topics.append(&mut config.topics.clone()); + let mut subscribed_topics = vec![]; - for topic in config.topics { - let t = TopicBuilder::new(topic.to_string()).build(); + for topic in topics { + let t = TopicBuilder::new(topic.clone()).build(); if swarm.subscribe(t) { trace!(log, "Subscribed to topic: {:?}", topic); subscribed_topics.push(topic); @@ -135,9 +133,6 @@ impl Stream for Service { BehaviourEvent::PeerDialed(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); } - BehaviourEvent::Identified(peer_id, info) => { - return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info)))); - } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, @@ -150,7 +145,7 @@ impl Stream for Service { /// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and /// mplex or yamux as the multiplexing layer. -fn build_transport(local_private_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { +fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // in the future. let transport = libp2p::tcp::TcpConfig::new(); @@ -187,8 +182,6 @@ pub enum Libp2pEvent { RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. PeerDialed(PeerId), - /// Received information about a peer on the network. - Identified(PeerId, Box), /// Received pubsub message. PubsubMessage { source: PeerId, @@ -196,3 +189,51 @@ pub enum Libp2pEvent { message: Box, }, } + +/// Loads a private key from disk. If this fails, a new key is +/// generated and is then saved to disk. +/// +/// Currently only secp256k1 keys are allowed, as these are the only keys supported by discv5. +fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair { + // TODO: Currently using secp256k1 keypairs - currently required for discv5 + // check for key from disk + let network_key_f = config.network_dir.join(NETWORK_KEY_FILENAME); + if let Ok(mut network_key_file) = File::open(network_key_f.clone()) { + let mut key_bytes: Vec = Vec::with_capacity(36); + match network_key_file.read_to_end(&mut key_bytes) { + Err(_) => debug!(log, "Could not read network key file"), + Ok(_) => { + // only accept secp256k1 keys for now + if let Ok(secret_key) = + libp2p::core::identity::secp256k1::SecretKey::from_bytes(&mut key_bytes) + { + let kp: libp2p::core::identity::secp256k1::Keypair = secret_key.into(); + debug!(log, "Loaded network key from disk."); + return Keypair::Secp256k1(kp); + } else { + debug!(log, "Network key file is not a valid secp256k1 key"); + } + } + } + } + + // if a key could not be loaded from disk, generate a new one and save it + let local_private_key = Keypair::generate_secp256k1(); + if let Keypair::Secp256k1(key) = local_private_key.clone() { + let _ = std::fs::create_dir_all(&config.network_dir); + match File::create(network_key_f.clone()) + .and_then(|mut f| f.write_all(&key.secret().to_bytes())) + { + Ok(_) => { + debug!(log, "New network key generated and written to disk"); + } + Err(e) => { + warn!( + log, + "Could not write node key to file: {:?}. Error: {}", network_key_f, e + ); + } + } + } + local_private_key +} diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml index 45e0349f55..9a7a4b0a56 100644 --- a/beacon_node/http_server/Cargo.toml +++ b/beacon_node/http_server/Cargo.toml @@ -13,7 +13,7 @@ network = { path = "../network" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz = { path = "../../eth2/utils/ssz" } slot_clock = { path = "../../eth2/utils/slot_clock" } protos = { path = "../../protos" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index ebf71aa4e0..23fbdd7d92 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,8 +13,8 @@ store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } -ssz = { path = "../../eth2/utils/ssz" } +slog = { version = "^2.2.3" } +eth2_ssz = { path = "../../eth2/utils/ssz" } tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25" error-chain = "0.12.0" diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 9c71a60f7e..b2ecc1a0b3 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -4,6 +4,7 @@ use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use eth2_libp2p::Service as LibP2PService; +use eth2_libp2p::Topic; use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; @@ -13,7 +14,6 @@ use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::Topic; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { @@ -126,12 +126,6 @@ fn network_service( .send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "failed to send rpc to handler")?; } - Libp2pEvent::Identified(peer_id, info) => { - debug!( - log, - "We have identified peer: {:?} with {:?}", peer_id, info - ); - } Libp2pEvent::PubsubMessage { source, message, .. } => { diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 90c354cfd5..8cc3dd65df 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -1,7 +1,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::PeerId; -use slog::{debug, error}; +use slog::error; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tree_hash::TreeHash; @@ -22,7 +23,7 @@ use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; pub struct ImportQueue { pub chain: Arc>, /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. - pub partials: Vec, + partials: HashMap, /// Time before a queue entry is considered state. pub stale_time: Duration, /// Logging @@ -34,7 +35,7 @@ impl ImportQueue { pub fn new(chain: Arc>, stale_time: Duration, log: slog::Logger) -> Self { Self { chain, - partials: vec![], + partials: HashMap::new(), stale_time, log, } @@ -52,7 +53,7 @@ impl ImportQueue { let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self .partials .iter() - .filter_map(|partial| partial.clone().complete()) + .filter_map(|(_, partial)| partial.clone().complete()) .collect(); // Sort the completable partials to be in ascending slot order. @@ -61,14 +62,14 @@ impl ImportQueue { complete } + pub fn contains_block_root(&self, block_root: Hash256) -> bool { + self.partials.contains_key(&block_root) + } + /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial /// if it exists. pub fn remove(&mut self, block_root: Hash256) -> Option { - let position = self - .partials - .iter() - .position(|p| p.block_root == block_root)?; - Some(self.partials.remove(position)) + self.partials.remove(&block_root) } /// Flushes all stale entries from the queue. @@ -76,31 +77,10 @@ impl ImportQueue { /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the /// past. pub fn remove_stale(&mut self) { - let stale_indices: Vec = self - .partials - .iter() - .enumerate() - .filter_map(|(i, partial)| { - if partial.inserted + self.stale_time <= Instant::now() { - Some(i) - } else { - None - } - }) - .collect(); + let stale_time = self.stale_time; - if !stale_indices.is_empty() { - debug!( - self.log, - "ImportQueue removing stale entries"; - "stale_items" => stale_indices.len(), - "stale_time_seconds" => self.stale_time.as_secs() - ); - } - - stale_indices.iter().for_each(|&i| { - self.partials.remove(i); - }); + self.partials + .retain(|_, partial| partial.inserted + stale_time > Instant::now()) } /// Returns `true` if `self.chain` has not yet processed this block. @@ -122,27 +102,30 @@ impl ImportQueue { block_roots: &[BlockRootSlot], sender: PeerId, ) -> Vec { - let new_roots: Vec = block_roots + let new_block_root_slots: Vec = block_roots .iter() + // Ignore any roots already stored in the queue. + .filter(|brs| !self.contains_block_root(brs.block_root)) // Ignore any roots already processed by the chain. .filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) - // Ignore any roots already stored in the queue. - .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) .cloned() .collect(); - new_roots.iter().for_each(|brs| { - self.partials.push(PartialBeaconBlock { - slot: brs.slot, - block_root: brs.block_root, - sender: sender.clone(), - header: None, - body: None, - inserted: Instant::now(), - }) - }); + self.partials.extend( + new_block_root_slots + .iter() + .map(|brs| PartialBeaconBlock { + slot: brs.slot, + block_root: brs.block_root, + sender: sender.clone(), + header: None, + body: None, + inserted: Instant::now(), + }) + .map(|partial| (partial.block_root, partial)), + ); - new_roots + new_block_root_slots } /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for @@ -170,7 +153,7 @@ impl ImportQueue { if self.chain_has_not_seen_block(&block_root) { self.insert_header(block_root, header, sender.clone()); - required_bodies.push(block_root) + required_bodies.push(block_root); } } @@ -197,31 +180,20 @@ impl ImportQueue { /// If the header already exists, the `inserted` time is set to `now` and not other /// modifications are made. fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { - if let Some(i) = self - .partials - .iter() - .position(|p| p.block_root == block_root) - { - // Case 1: there already exists a partial with a matching block root. - // - // The `inserted` time is set to now and the header is replaced, regardless of whether - // it existed or not. - self.partials[i].header = Some(header); - self.partials[i].inserted = Instant::now(); - } else { - // Case 2: there was no partial with a matching block root. - // - // A new partial is added. This case permits adding a header without already known the - // root. - self.partials.push(PartialBeaconBlock { + self.partials + .entry(block_root) + .and_modify(|partial| { + partial.header = Some(header.clone()); + partial.inserted = Instant::now(); + }) + .or_insert_with(|| PartialBeaconBlock { slot: header.slot, block_root, header: Some(header), body: None, inserted: Instant::now(), sender, - }) - } + }); } /// Updates an existing partial with the `body`. @@ -232,7 +204,7 @@ impl ImportQueue { fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { let body_root = Hash256::from_slice(&body.tree_hash_root()[..]); - self.partials.iter_mut().for_each(|mut p| { + self.partials.iter_mut().for_each(|(_, mut p)| { if let Some(header) = &mut p.header { if body_root == header.block_body_root { p.inserted = Instant::now(); @@ -261,15 +233,10 @@ impl ImportQueue { sender, }; - if let Some(i) = self - .partials - .iter() - .position(|p| p.block_root == block_root) - { - self.partials[i] = partial; - } else { - self.partials.push(partial) - } + self.partials + .entry(block_root) + .and_modify(|existing_partial| *existing_partial = partial.clone()) + .or_insert(partial); } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 0a082afcf7..5899e5aea7 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -4,7 +4,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, error, info, o, warn}; +use slog::{debug, error, info, o, trace, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -17,7 +17,7 @@ use types::{ const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. -const QUEUE_STALE_SECS: u64 = 600; +const QUEUE_STALE_SECS: u64 = 6; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. @@ -72,7 +72,6 @@ pub struct SimpleSync { import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, - /// Sync logger. log: slog::Logger, } @@ -160,96 +159,100 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { - let spec = &self.chain.spec; - let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); - // Disconnect nodes who are on a different network. + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + if local.network_id != remote.network_id { + // The node is on a different network, disconnect them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); + network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); - // Disconnect nodes if our finalized epoch is greater than thieirs, and their finalized - // epoch is not in our chain. Viz., they are on another chain. - // - // If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about - // the finalized_root. The logic is akward and I think we're better without it. - } else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch) - && (!self - .chain - .rev_iter_block_roots(local.best_slot) - .any(|(root, _slot)| root == remote.latest_finalized_root)) - && (local.latest_finalized_root != spec.zero_hash) - && (remote.latest_finalized_root != spec.zero_hash) + } else if remote.latest_finalized_epoch <= local.latest_finalized_epoch + && remote.latest_finalized_root != self.chain.spec.zero_hash + && local.latest_finalized_root != self.chain.spec.zero_hash + && (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) + != Some(remote.latest_finalized_root)) { + // The remotes finalized epoch is less than or greater than ours, but the block root is + // different to the one in our chain. + // + // Therefore, the node is on a different chain and we should not communicate with them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), - "reason" => "wrong_finalized_chain" + "reason" => "different finalized chain" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); - // Process handshakes from peers that seem to be on our chain. - } else { - info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); - self.known_peers.insert(peer_id.clone(), remote); - - // If we have equal or better finalized epochs and best slots, we require nothing else from - // this peer. + } else if remote.latest_finalized_epoch < local.latest_finalized_epoch { + // The node has a lower finalized epoch, their chain is not useful to us. There are two + // cases where a node can have a lower finalized epoch: // - // We make an exception when our best slot is 0. Best slot does not indicate wether or - // not there is a block at slot zero. - if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) - && (remote.best_slot <= local.best_slot) - && (local.best_slot > 0) - { - debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id)); - return; - } + // ## The node is on the same chain + // + // If a node is on the same chain but has a lower finalized epoch, their head must be + // lower than ours. Therefore, we have nothing to request from them. + // + // ## The node is on a fork + // + // If a node is on a fork that has a lower finalized epoch, switching to that fork would + // cause us to revert a finalized block. This is not permitted, therefore we have no + // interest in their blocks. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "lower finalized epoch" + ); + } else if self + .chain + .store + .exists::(&remote.best_root) + .unwrap_or_else(|_| false) + { + // If the node's best-block is already known to us, we have nothing to request. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "best block is known" + ); + } else { + // The remote node has an equal or great finalized epoch and we don't know it's head. + // + // Therefore, there are some blocks between the local finalized epoch and the remote + // head that are worth downloading. + debug!(self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id)); - // If the remote has a higher finalized epoch, request all block roots from our finalized - // epoch through to its best slot. - if remote.latest_finalized_epoch > local.latest_finalized_epoch { - debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id)); - let start_slot = local - .latest_finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let required_slots = remote.best_slot - start_slot; + let start_slot = local + .latest_finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let required_slots = remote.best_slot - start_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - // If the remote has a greater best slot, request the roots between our best slot and their - // best slot. - } else if remote.best_slot > local.best_slot { - debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id)); - let start_slot = local - .latest_finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let required_slots = remote.best_slot - start_slot; - - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - } else { - debug!(self.log, "Nothing to request from peer"; "peer" => format!("{:?}", peer_id)); - } + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); } } + fn root_at_slot(&self, target_slot: Slot) -> Option { + self.chain + .rev_iter_block_roots(target_slot) + .take(1) + .find(|(_root, slot)| *slot == target_slot) + .map(|(root, _slot)| root) + } + /// Handle a `BeaconBlockRoots` request from the peer. pub fn on_beacon_block_roots_request( &mut self, @@ -275,11 +278,13 @@ impl SimpleSync { .collect(); if roots.len() as u64 != req.count { - debug!( + warn!( self.log, "BlockRootsRequest"; "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", + "start_slot" => req.start_slot, + "current_slot" => self.chain.current_state().slot, "requested" => req.count, "returned" => roots.len(), ); @@ -351,7 +356,7 @@ impl SimpleSync { BeaconBlockHeadersRequest { start_root: first.block_root, start_slot: first.slot, - max_headers: (last.slot - first.slot + 1).as_u64(), + max_headers: (last.slot - first.slot).as_u64(), skip_slots: 0, }, network, @@ -433,7 +438,9 @@ impl SimpleSync { .import_queue .enqueue_headers(res.headers, peer_id.clone()); - self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + if !block_roots.is_empty() { + self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + } } /// Handle a `BeaconBlockBodies` request from the peer. @@ -518,9 +525,32 @@ impl SimpleSync { { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, - BlockProcessingOutcome::ParentUnknown { .. } => { + BlockProcessingOutcome::ParentUnknown { parent } => { + // Clean the stale entries from the queue. + self.import_queue.remove_stale(); + + // Add this block to the queue self.import_queue .enqueue_full_blocks(vec![block], peer_id.clone()); + trace!( + self.log, + "NewGossipBlock"; + "peer" => format!("{:?}", peer_id), + ); + + // Unless the parent is in the queue, request the parent block from the peer. + // + // It is likely that this is duplicate work, given we already send a hello + // request. However, I believe there are some edge-cases where the hello + // message doesn't suffice, so we perform this request as well. + if !self.import_queue.contains_block_root(parent) { + // Send a hello to learn of the clients best slot so we can then sync the required + // parent(s). + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(hello_message(&self.chain)), + ); + } SHOULD_FORWARD_GOSSIP_BLOCK } @@ -696,7 +726,7 @@ impl SimpleSync { if let Ok(outcome) = processing_result { match outcome { BlockProcessingOutcome::Processed { block_root } => { - info!( + debug!( self.log, "Imported block from network"; "source" => source, "slot" => block.slot, @@ -713,28 +743,19 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), ); - // Send a hello to learn of the clients best slot so we can then sync the require - // parent(s). - network.send_rpc_request( - peer_id.clone(), - RPCRequest::Hello(hello_message(&self.chain)), - ); - - // Explicitly request the parent block from the peer. + // Unless the parent is in the queue, request the parent block from the peer. // // It is likely that this is duplicate work, given we already send a hello // request. However, I believe there are some edge-cases where the hello // message doesn't suffice, so we perform this request as well. - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: parent, - start_slot: block.slot - 1, - max_headers: 1, - skip_slots: 0, - }, - network, - ) + if !self.import_queue.contains_block_root(parent) { + // Send a hello to learn of the clients best slot so we can then sync the require + // parent(s). + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(hello_message(&self.chain)), + ); + } } BlockProcessingOutcome::FutureSlot { present_slot, diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml index d707cc36d4..a374927966 100644 --- a/beacon_node/rpc/Cargo.toml +++ b/beacon_node/rpc/Cargo.toml @@ -11,7 +11,7 @@ network = { path = "../network" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz = { path = "../../eth2/utils/ssz" } slot_clock = { path = "../../eth2/utils/slot_clock" } protos = { path = "../../protos" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 0f585b7e75..86f4331f14 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,5 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; +use eth2_libp2p::TopicBuilder; +use eth2_libp2p::SHARD_TOPIC_PREFIX; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -136,11 +138,10 @@ impl AttestationService for AttestationServiceInstance { "type" => "valid_attestation", ); - // TODO: Obtain topics from the network service properly. - let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); + // valid attestation, propagate to the network + let topic = TopicBuilder::new(SHARD_TOPIC_PREFIX).build(); let message = PubsubMessage::Attestation(attestation); - // Publish the attestation to the p2p network via gossipsub. self.network_chan .send(NetworkMessage::Publish { topics: vec![topic], @@ -150,7 +151,7 @@ impl AttestationService for AttestationServiceInstance { error!( self.log, "PublishAttestation"; - "type" => "failed to publish to gossipsub", + "type" => "failed to publish attestation to gossipsub", "error" => format!("{:?}", e) ); }); diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 533fd285a3..cdf46a1ab5 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use crossbeam_channel; -use eth2_libp2p::PubsubMessage; +use eth2_libp2p::BEACON_PUBSUB_TOPIC; +use eth2_libp2p::{PubsubMessage, TopicBuilder}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -104,9 +105,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { "block_root" => format!("{}", block_root), ); - // TODO: Obtain topics from the network service properly. - let topic = - types::TopicBuilder::new("beacon_chain".to_string()).build(); + // get the network topic to send on + let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); let message = PubsubMessage::Block(block); // Publish the block to the p2p network via gossipsub. diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index f2f1b2abf3..3e6fd3e73e 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -60,8 +60,8 @@ pub fn start_server( }; let attestation_service = { let instance = AttestationServiceInstance { - chain: beacon_chain.clone(), network_chan, + chain: beacon_chain.clone(), log: log.clone(), }; create_attestation_service(instance) diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index d6274befc8..791feae54b 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -1,11 +1,11 @@ -extern crate slog; - mod run; use clap::{App, Arg}; use client::{ClientConfig, Eth2Config}; -use eth2_config::{get_data_dir, read_from_file, write_to_file}; -use slog::{crit, o, Drain}; +use env_logger::{Builder, Env}; +use eth2_config::{read_from_file, write_to_file}; +use slog::{crit, o, Drain, Level}; +use std::fs; use std::path::PathBuf; pub const DEFAULT_DATA_DIR: &str = ".lighthouse"; @@ -14,10 +14,8 @@ pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; fn main() { - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - let logger = slog::Logger::root(drain, o!()); + // debugging output for libp2p and external crates + Builder::from_env(Env::default()).init(); let matches = App::new("Lighthouse") .version(version::version().as_str()) @@ -30,21 +28,48 @@ fn main() { .value_name("DIR") .help("Data directory for keys and databases.") .takes_value(true) - .default_value(DEFAULT_DATA_DIR), ) // network related arguments .arg( Arg::with_name("listen-address") .long("listen-address") - .value_name("Listen Address") - .help("One or more comma-delimited multi-addresses to listen for p2p connections.") + .value_name("Address") + .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") + .takes_value(true), + ) + .arg( + Arg::with_name("maxpeers") + .long("maxpeers") + .help("The maximum number of peers (default 10).") .takes_value(true), ) .arg( Arg::with_name("boot-nodes") .long("boot-nodes") + .allow_hyphen_values(true) .value_name("BOOTNODES") - .help("One or more comma-delimited multi-addresses to bootstrap the p2p network.") + .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .long("port") + .value_name("Lighthouse Port") + .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") + .takes_value(true), + ) + .arg( + Arg::with_name("discovery-port") + .long("disc-port") + .value_name("DiscoveryPort") + .help("The discovery UDP port.") + .takes_value(true), + ) + .arg( + Arg::with_name("discovery-address") + .long("discovery-address") + .value_name("Address") + .help("The IP address to broadcast to other peers on how to reach this node.") .takes_value(true), ) // rpc related arguments @@ -58,14 +83,13 @@ fn main() { .arg( Arg::with_name("rpc-address") .long("rpc-address") - .value_name("RPCADDRESS") + .value_name("Address") .help("Listen address for RPC endpoint.") .takes_value(true), ) .arg( Arg::with_name("rpc-port") .long("rpc-port") - .value_name("RPCPORT") .help("Listen port for RPC endpoint.") .takes_value(true), ) @@ -73,21 +97,19 @@ fn main() { .arg( Arg::with_name("http") .long("http") - .value_name("HTTP") .help("Enable the HTTP server.") .takes_value(false), ) .arg( Arg::with_name("http-address") .long("http-address") - .value_name("HTTPADDRESS") + .value_name("Address") .help("Listen address for the HTTP server.") .takes_value(true), ) .arg( Arg::with_name("http-port") .long("http-port") - .value_name("HTTPPORT") .help("Listen port for the HTTP server.") .takes_value(true), ) @@ -116,19 +138,60 @@ fn main() { .short("r") .help("When present, genesis will be within 30 minutes prior. Only for testing"), ) + .arg( + Arg::with_name("verbosity") + .short("v") + .multiple(true) + .help("Sets the verbosity level") + .takes_value(true), + ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(logger, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + // build the initial logger + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build(); + + let drain = match matches.occurrences_of("verbosity") { + 0 => drain.filter_level(Level::Info), + 1 => drain.filter_level(Level::Debug), + 2 => drain.filter_level(Level::Trace), + _ => drain.filter_level(Level::Info), + }; + + let log = slog::Logger::root(drain.fuse(), o!()); + + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); - // Attempt to lead the `ClientConfig` from disk. + // Attempt to load the `ClientConfig` from disk. // // If file doesn't exist, create a new, default one. let mut client_config = match read_from_file::(client_config_path.clone()) { @@ -136,13 +199,13 @@ fn main() { Ok(None) => { let default = ClientConfig::default(); if let Err(e) = write_to_file(client_config_path, &default) { - crit!(logger, "Failed to write default ClientConfig to file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to write default ClientConfig to file"; "error" => format!("{:?}", e)); return; } default } Err(e) => { - crit!(logger, "Failed to load a ChainConfig file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to load a ChainConfig file"; "error" => format!("{:?}", e)); return; } }; @@ -154,7 +217,7 @@ fn main() { match client_config.apply_cli_args(&matches) { Ok(()) => (), Err(s) => { - crit!(logger, "Failed to parse ClientConfig CLI arguments"; "error" => s); + crit!(log, "Failed to parse ClientConfig CLI arguments"; "error" => s); return; } }; @@ -173,13 +236,13 @@ fn main() { _ => unreachable!(), // Guarded by slog. }; if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(logger, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); return; } default } Err(e) => { - crit!(logger, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); return; } }; @@ -188,13 +251,13 @@ fn main() { match eth2_config.apply_cli_args(&matches) { Ok(()) => (), Err(s) => { - crit!(logger, "Failed to parse Eth2Config CLI arguments"; "error" => s); + crit!(log, "Failed to parse Eth2Config CLI arguments"; "error" => s); return; } }; - match run::run_beacon_node(client_config, eth2_config, &logger) { + match run::run_beacon_node(client_config, eth2_config, &log) { Ok(_) => {} - Err(e) => crit!(logger, "Beacon node failed to start"; "reason" => format!("{:}", e)), + Err(e) => crit!(log, "Beacon node failed to start"; "reason" => format!("{:}", e)), } } diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 834f9a4281..51fa161549 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -41,6 +41,15 @@ pub fn run_beacon_node( "This software is EXPERIMENTAL and provides no guarantees or warranties." ); + info!( + log, + "Starting beacon node"; + "p2p_listen_address" => format!("{:?}", &other_client_config.network.listen_address), + "data_dir" => format!("{:?}", other_client_config.data_dir()), + "spec_constants" => &spec_constants, + "db_type" => &other_client_config.db_type, + ); + let result = match (db_type.as_str(), spec_constants.as_str()) { ("disk", "minimal") => run::>( &db_path, @@ -80,17 +89,6 @@ pub fn run_beacon_node( } }; - if result.is_ok() { - info!( - log, - "Started beacon node"; - "p2p_listen_addresses" => format!("{:?}", &other_client_config.network.listen_addresses()), - "data_dir" => format!("{:?}", other_client_config.data_dir()), - "spec_constants" => &spec_constants, - "db_type" => &other_client_config.db_type, - ); - } - result } diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index a95dafa908..6dcb771d21 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -14,7 +14,7 @@ bytes = "0.4.10" db-key = "0.0.5" leveldb = "0.8.4" parking_lot = "0.7" -ssz = { path = "../../eth2/utils/ssz" } -ssz_derive = { path = "../../eth2/utils/ssz_derive" } +eth2_ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" } tree_hash = { path = "../../eth2/utils/tree_hash" } types = { path = "../../eth2/types" } diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index cf50d671bf..76807ce8f9 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -139,8 +139,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { Err(BeaconStateError::SlotOutOfBounds) => { // Read a `BeaconState` from the store that has access to prior historical root. let beacon_state: BeaconState = { - // Load the earlier state from disk. Skip forward one slot, because a state - // doesn't return it's own state root. + // Load the earliest state from disk. let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; self.store.get(&new_state_root).ok()? diff --git a/eth2/lmd_ghost/Cargo.toml b/eth2/lmd_ghost/Cargo.toml index 788708faa2..c21af693ee 100644 --- a/eth2/lmd_ghost/Cargo.toml +++ b/eth2/lmd_ghost/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] parking_lot = "0.7" store = { path = "../../beacon_node/store" } -ssz = { path = "../utils/ssz" } +eth2_ssz = { path = "../utils/ssz" } state_processing = { path = "../state_processing" } types = { path = "../types" } log = "0.4.6" diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index 67d13013ca..d1fd181910 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -5,9 +5,11 @@ authors = ["Michael Sproul "] edition = "2018" [dependencies] +boolean-bitfield = { path = "../utils/boolean-bitfield" } int_to_bytes = { path = "../utils/int_to_bytes" } itertools = "0.8" parking_lot = "0.7" types = { path = "../types" } state_processing = { path = "../state_processing" } -ssz = { path = "../utils/ssz" } +eth2_ssz = { path = "../utils/ssz" } +eth2_ssz_derive = { path = "../utils/ssz_derive" } diff --git a/eth2/operation_pool/src/attestation.rs b/eth2/operation_pool/src/attestation.rs new file mode 100644 index 0000000000..a2f71c3a48 --- /dev/null +++ b/eth2/operation_pool/src/attestation.rs @@ -0,0 +1,91 @@ +use crate::max_cover::MaxCover; +use boolean_bitfield::BooleanBitfield; +use types::{Attestation, BeaconState, EthSpec}; + +pub struct AttMaxCover<'a> { + /// Underlying attestation. + att: &'a Attestation, + /// Bitfield of validators that are covered by this attestation. + fresh_validators: BooleanBitfield, +} + +impl<'a> AttMaxCover<'a> { + pub fn new(att: &'a Attestation, fresh_validators: BooleanBitfield) -> Self { + Self { + att, + fresh_validators, + } + } +} + +impl<'a> MaxCover for AttMaxCover<'a> { + type Object = Attestation; + type Set = BooleanBitfield; + + fn object(&self) -> Attestation { + self.att.clone() + } + + fn covering_set(&self) -> &BooleanBitfield { + &self.fresh_validators + } + + /// Sneaky: we keep all the attestations together in one bucket, even though + /// their aggregation bitfields refer to different committees. In order to avoid + /// confusing committees when updating covering sets, we update only those attestations + /// whose shard and epoch match the attestation being included in the solution, by the logic + /// that a shard and epoch uniquely identify a committee. + fn update_covering_set( + &mut self, + best_att: &Attestation, + covered_validators: &BooleanBitfield, + ) { + if self.att.data.shard == best_att.data.shard + && self.att.data.target_epoch == best_att.data.target_epoch + { + self.fresh_validators.difference_inplace(covered_validators); + } + } + + fn score(&self) -> usize { + self.fresh_validators.num_set_bits() + } +} + +/// Extract the validators for which `attestation` would be their earliest in the epoch. +/// +/// The reward paid to a proposer for including an attestation is proportional to the number +/// of validators for which the included attestation is their first in the epoch. The attestation +/// is judged against the state's `current_epoch_attestations` or `previous_epoch_attestations` +/// depending on when it was created, and all those validators who have already attested are +/// removed from the `aggregation_bitfield` before returning it. +// TODO: This could be optimised with a map from validator index to whether that validator has +// attested in each of the current and previous epochs. Currently quadratic in number of validators. +pub fn earliest_attestation_validators( + attestation: &Attestation, + state: &BeaconState, +) -> BooleanBitfield { + // Bitfield of validators whose attestations are new/fresh. + let mut new_validators = attestation.aggregation_bitfield.clone(); + + let state_attestations = if attestation.data.target_epoch == state.current_epoch() { + &state.current_epoch_attestations + } else if attestation.data.target_epoch == state.previous_epoch() { + &state.previous_epoch_attestations + } else { + return BooleanBitfield::from_elem(attestation.aggregation_bitfield.len(), false); + }; + + state_attestations + .iter() + // In a single epoch, an attester should only be attesting for one shard. + // TODO: we avoid including slashable attestations in the state here, + // but maybe we should do something else with them (like construct slashings). + .filter(|existing_attestation| existing_attestation.data.shard == attestation.data.shard) + .for_each(|existing_attestation| { + // Remove the validators who have signed the existing attestation (they are not new) + new_validators.difference_inplace(&existing_attestation.aggregation_bitfield); + }); + + new_validators +} diff --git a/eth2/operation_pool/src/attestation_id.rs b/eth2/operation_pool/src/attestation_id.rs new file mode 100644 index 0000000000..a79023a699 --- /dev/null +++ b/eth2/operation_pool/src/attestation_id.rs @@ -0,0 +1,38 @@ +use int_to_bytes::int_to_bytes8; +use ssz::ssz_encode; +use ssz_derive::{Decode, Encode}; +use types::{AttestationData, BeaconState, ChainSpec, Domain, Epoch, EthSpec}; + +/// Serialized `AttestationData` augmented with a domain to encode the fork info. +#[derive(PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode)] +pub struct AttestationId { + v: Vec, +} + +/// Number of domain bytes that the end of an attestation ID is padded with. +const DOMAIN_BYTES_LEN: usize = 8; + +impl AttestationId { + pub fn from_data( + attestation: &AttestationData, + state: &BeaconState, + spec: &ChainSpec, + ) -> Self { + let mut bytes = ssz_encode(attestation); + let epoch = attestation.target_epoch; + bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); + AttestationId { v: bytes } + } + + pub fn compute_domain_bytes( + epoch: Epoch, + state: &BeaconState, + spec: &ChainSpec, + ) -> Vec { + int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork)) + } + + pub fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { + &self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes + } +} diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index ec7d5aa905..6c6f1e7524 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -1,13 +1,19 @@ -use int_to_bytes::int_to_bytes8; +mod attestation; +mod attestation_id; +mod max_cover; +mod persistence; + +pub use persistence::PersistedOperationPool; + +use attestation::{earliest_attestation_validators, AttMaxCover}; +use attestation_id::AttestationId; use itertools::Itertools; +use max_cover::maximum_cover; use parking_lot::RwLock; -use ssz::ssz_encode; use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, }; -#[cfg(not(test))] -use state_processing::per_block_processing::verify_deposit_merkle_proof; use state_processing::per_block_processing::{ get_slashable_indices_modular, validate_attestation, validate_attestation_time_independent_only, verify_attester_slashing, verify_exit, @@ -16,13 +22,12 @@ use state_processing::per_block_processing::{ }; use std::collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}; use std::marker::PhantomData; -use types::chain_spec::Domain; use types::{ - Attestation, AttestationData, AttesterSlashing, BeaconState, ChainSpec, Deposit, Epoch, - EthSpec, ProposerSlashing, Transfer, Validator, VoluntaryExit, + Attestation, AttesterSlashing, BeaconState, ChainSpec, Deposit, EthSpec, ProposerSlashing, + Transfer, Validator, VoluntaryExit, }; -#[derive(Default)] +#[derive(Default, Debug)] pub struct OperationPool { /// Map from attestation ID (see below) to vectors of attestations. attestations: RwLock>>, @@ -43,71 +48,6 @@ pub struct OperationPool { _phantom: PhantomData, } -/// Serialized `AttestationData` augmented with a domain to encode the fork info. -#[derive(PartialEq, Eq, Clone, Hash, Debug)] -struct AttestationId(Vec); - -/// Number of domain bytes that the end of an attestation ID is padded with. -const DOMAIN_BYTES_LEN: usize = 8; - -impl AttestationId { - fn from_data( - attestation: &AttestationData, - state: &BeaconState, - spec: &ChainSpec, - ) -> Self { - let mut bytes = ssz_encode(attestation); - let epoch = attestation.target_epoch; - bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); - AttestationId(bytes) - } - - fn compute_domain_bytes( - epoch: Epoch, - state: &BeaconState, - spec: &ChainSpec, - ) -> Vec { - int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork)) - } - - fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { - &self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes - } -} - -/// Compute a fitness score for an attestation. -/// -/// The score is calculated by determining the number of *new* attestations that -/// the aggregate attestation introduces, and is proportional to the size of the reward we will -/// receive for including it in a block. -// TODO: this could be optimised with a map from validator index to whether that validator has -// attested in each of the current and previous epochs. Currently quadractic in number of validators. -fn attestation_score(attestation: &Attestation, state: &BeaconState) -> usize { - // Bitfield of validators whose attestations are new/fresh. - let mut new_validators = attestation.aggregation_bitfield.clone(); - - let state_attestations = if attestation.data.target_epoch == state.current_epoch() { - &state.current_epoch_attestations - } else if attestation.data.target_epoch == state.previous_epoch() { - &state.previous_epoch_attestations - } else { - return 0; - }; - - state_attestations - .iter() - // In a single epoch, an attester should only be attesting for one shard. - // TODO: we avoid including slashable attestations in the state here, - // but maybe we should do something else with them (like construct slashings). - .filter(|current_attestation| current_attestation.data.shard == attestation.data.shard) - .for_each(|current_attestation| { - // Remove the validators who have signed the existing attestation (they are not new) - new_validators.difference_inplace(¤t_attestation.aggregation_bitfield); - }); - - new_validators.num_set_bits() -} - #[derive(Debug, PartialEq, Clone)] pub enum DepositInsertStatus { /// The deposit was not already in the pool. @@ -176,29 +116,19 @@ impl OperationPool { let current_epoch = state.current_epoch(); let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec); let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec); - self.attestations - .read() + let reader = self.attestations.read(); + let valid_attestations = reader .iter() .filter(|(key, _)| { key.domain_bytes_match(&prev_domain_bytes) || key.domain_bytes_match(&curr_domain_bytes) }) .flat_map(|(_, attestations)| attestations) - // That are not superseded by an attestation included in the state... - .filter(|attestation| !superior_attestation_exists_in_state(state, attestation)) // That are valid... .filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) - // Scored by the number of new attestations they introduce (descending) - // TODO: need to consider attestations introduced in THIS block - .map(|att| (att, attestation_score(att, state))) - // Don't include any useless attestations (score 0) - .filter(|&(_, score)| score != 0) - .sorted_by_key(|&(_, score)| std::cmp::Reverse(score)) - // Limited to the maximum number of attestations per block - .take(spec.max_attestations as usize) - .map(|(att, _)| att) - .cloned() - .collect() + .map(|att| AttMaxCover::new(att, earliest_attestation_validators(att, state))); + + maximum_cover(valid_attestations, spec.max_attestations as usize) } /// Remove attestations which are too old to be included in a block. @@ -219,20 +149,14 @@ impl OperationPool { /// Add a deposit to the pool. /// /// No two distinct deposits should be added with the same index. - #[cfg_attr(test, allow(unused_variables))] pub fn insert_deposit( &self, deposit: Deposit, - state: &BeaconState, - spec: &ChainSpec, ) -> Result { use DepositInsertStatus::*; match self.deposits.write().entry(deposit.index) { Entry::Vacant(entry) => { - // TODO: fix tests to generate valid merkle proofs - #[cfg(not(test))] - verify_deposit_merkle_proof(state, &deposit, spec)?; entry.insert(deposit); Ok(Fresh) } @@ -240,9 +164,6 @@ impl OperationPool { if entry.get() == &deposit { Ok(Duplicate) } else { - // TODO: fix tests to generate valid merkle proofs - #[cfg(not(test))] - verify_deposit_merkle_proof(state, &deposit, spec)?; Ok(Replaced(Box::new(entry.insert(deposit)))) } } @@ -253,7 +174,9 @@ impl OperationPool { /// /// Take at most the maximum number of deposits, beginning from the current deposit index. pub fn get_deposits(&self, state: &BeaconState, spec: &ChainSpec) -> Vec { - // TODO: might want to re-check the Merkle proof to account for Eth1 forking + // TODO: We need to update the Merkle proofs for existing deposits as more deposits + // are added. It probably makes sense to construct the proofs from scratch when forming + // a block, using fresh info from the ETH1 chain for the current deposit root. let start_idx = state.deposit_index; (start_idx..start_idx + spec.max_deposits) .map(|idx| self.deposits.read().get(&idx).cloned()) @@ -484,34 +407,6 @@ impl OperationPool { } } -/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the -/// given `attestation`. -/// -/// A validator has nothing to gain from re-including an attestation and it adds load to the -/// network. -/// -/// An existing `PendingAttestation` is superior to an existing `attestation` if: -/// -/// - Their `AttestationData` is equal. -/// - `attestation` does not contain any signatures that `PendingAttestation` does not have. -fn superior_attestation_exists_in_state( - state: &BeaconState, - attestation: &Attestation, -) -> bool { - state - .current_epoch_attestations - .iter() - .chain(state.previous_epoch_attestations.iter()) - .any(|existing_attestation| { - let bitfield = &attestation.aggregation_bitfield; - let existing_bitfield = &existing_attestation.aggregation_bitfield; - - existing_attestation.data == attestation.data - && bitfield.intersection(existing_bitfield).num_set_bits() - == bitfield.num_set_bits() - }) -} - /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec where @@ -547,6 +442,18 @@ fn prune_validator_hash_map( }); } +/// Compare two operation pools. +impl PartialEq for OperationPool { + fn eq(&self, other: &Self) -> bool { + *self.attestations.read() == *other.attestations.read() + && *self.deposits.read() == *other.deposits.read() + && *self.attester_slashings.read() == *other.attester_slashings.read() + && *self.proposer_slashings.read() == *other.proposer_slashings.read() + && *self.voluntary_exits.read() == *other.voluntary_exits.read() + && *self.transfers.read() == *other.transfers.read() + } +} + #[cfg(test)] mod tests { use super::DepositInsertStatus::*; @@ -557,22 +464,15 @@ mod tests { #[test] fn insert_deposit() { let rng = &mut XorShiftRng::from_seed([42; 16]); - let (ref spec, ref state) = test_state(rng); - let op_pool = OperationPool::new(); + let op_pool = OperationPool::::new(); let deposit1 = make_deposit(rng); let mut deposit2 = make_deposit(rng); deposit2.index = deposit1.index; + assert_eq!(op_pool.insert_deposit(deposit1.clone()), Ok(Fresh)); + assert_eq!(op_pool.insert_deposit(deposit1.clone()), Ok(Duplicate)); assert_eq!( - op_pool.insert_deposit(deposit1.clone(), state, spec), - Ok(Fresh) - ); - assert_eq!( - op_pool.insert_deposit(deposit1.clone(), state, spec), - Ok(Duplicate) - ); - assert_eq!( - op_pool.insert_deposit(deposit2, state, spec), + op_pool.insert_deposit(deposit2), Ok(Replaced(Box::new(deposit1))) ); } @@ -591,10 +491,7 @@ mod tests { let deposits = dummy_deposits(rng, start, max_deposits + extra); for deposit in &deposits { - assert_eq!( - op_pool.insert_deposit(deposit.clone(), &state, &spec), - Ok(Fresh) - ); + assert_eq!(op_pool.insert_deposit(deposit.clone()), Ok(Fresh)); } state.deposit_index = start + offset; @@ -610,8 +507,7 @@ mod tests { #[test] fn prune_deposits() { let rng = &mut XorShiftRng::from_seed([42; 16]); - let (spec, state) = test_state(rng); - let op_pool = OperationPool::new(); + let op_pool = OperationPool::::new(); let start1 = 100; // test is super slow in debug mode if this parameter is too high @@ -623,7 +519,7 @@ mod tests { let deposits2 = dummy_deposits(rng, start2, count); for d in deposits1.into_iter().chain(deposits2) { - assert!(op_pool.insert_deposit(d, &state, &spec).is_ok()); + assert!(op_pool.insert_deposit(d).is_ok()); } assert_eq!(op_pool.num_deposits(), 2 * count as usize); @@ -734,15 +630,13 @@ mod tests { state_builder.teleport_to_slot(slot); state_builder.build_caches(&spec).unwrap(); let (state, keypairs) = state_builder.build(); - (state, keypairs, MainnetEthSpec::default_spec()) } #[test] - fn test_attestation_score() { + fn test_earliest_attestation() { let (ref mut state, ref keypairs, ref spec) = attestation_test_state::(1); - let slot = state.slot - 1; let committees = state .get_crosslink_committees_at_slot(slot) @@ -775,9 +669,8 @@ mod tests { assert_eq!( att1.aggregation_bitfield.num_set_bits(), - attestation_score(&att1, state) + earliest_attestation_validators(&att1, state).num_set_bits() ); - state.current_epoch_attestations.push(PendingAttestation { aggregation_bitfield: att1.aggregation_bitfield.clone(), data: att1.data.clone(), @@ -785,7 +678,10 @@ mod tests { proposer_index: 0, }); - assert_eq!(cc.committee.len() - 2, attestation_score(&att2, state)); + assert_eq!( + cc.committee.len() - 2, + earliest_attestation_validators(&att2, state).num_set_bits() + ); } } diff --git a/eth2/operation_pool/src/max_cover.rs b/eth2/operation_pool/src/max_cover.rs new file mode 100644 index 0000000000..75ac140546 --- /dev/null +++ b/eth2/operation_pool/src/max_cover.rs @@ -0,0 +1,189 @@ +/// Trait for types that we can compute a maximum cover for. +/// +/// Terminology: +/// * `item`: something that implements this trait +/// * `element`: something contained in a set, and covered by the covering set of an item +/// * `object`: something extracted from an item in order to comprise a solution +/// See: https://en.wikipedia.org/wiki/Maximum_coverage_problem +pub trait MaxCover { + /// The result type, of which we would eventually like a collection of maximal quality. + type Object; + /// The type used to represent sets. + type Set: Clone; + + /// Extract an object for inclusion in a solution. + fn object(&self) -> Self::Object; + + /// Get the set of elements covered. + fn covering_set(&self) -> &Self::Set; + /// Update the set of items covered, for the inclusion of some object in the solution. + fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set); + /// The quality of this item's covering set, usually its cardinality. + fn score(&self) -> usize; +} + +/// Helper struct to track which items of the input are still available for inclusion. +/// Saves removing elements from the work vector. +struct MaxCoverItem { + item: T, + available: bool, +} + +impl MaxCoverItem { + fn new(item: T) -> Self { + MaxCoverItem { + item, + available: true, + } + } +} + +/// Compute an approximate maximum cover using a greedy algorithm. +/// +/// * Time complexity: `O(limit * items_iter.len())` +/// * Space complexity: `O(item_iter.len())` +pub fn maximum_cover<'a, I, T>(items_iter: I, limit: usize) -> Vec +where + I: IntoIterator, + T: MaxCover, +{ + // Construct an initial vec of all items, marked available. + let mut all_items: Vec<_> = items_iter + .into_iter() + .map(MaxCoverItem::new) + .filter(|x| x.item.score() != 0) + .collect(); + + let mut result = vec![]; + + for _ in 0..limit { + // Select the item with the maximum score. + let (best_item, best_cover) = match all_items + .iter_mut() + .filter(|x| x.available && x.item.score() != 0) + .max_by_key(|x| x.item.score()) + { + Some(x) => { + x.available = false; + (x.item.object(), x.item.covering_set().clone()) + } + None => return result, + }; + + // Update the covering sets of the other items, for the inclusion of the selected item. + // Items covered by the selected item can't be re-covered. + all_items + .iter_mut() + .filter(|x| x.available && x.item.score() != 0) + .for_each(|x| x.item.update_covering_set(&best_item, &best_cover)); + + result.push(best_item); + } + + result +} + +#[cfg(test)] +mod test { + use super::*; + use std::iter::FromIterator; + use std::{collections::HashSet, hash::Hash}; + + impl MaxCover for HashSet + where + T: Clone + Eq + Hash, + { + type Object = Self; + type Set = Self; + + fn object(&self) -> Self { + self.clone() + } + + fn covering_set(&self) -> &Self { + &self + } + + fn update_covering_set(&mut self, _: &Self, other: &Self) { + let mut difference = &*self - other; + std::mem::swap(self, &mut difference); + } + + fn score(&self) -> usize { + self.len() + } + } + + fn example_system() -> Vec> { + vec![ + HashSet::from_iter(vec![3]), + HashSet::from_iter(vec![1, 2, 4, 5]), + HashSet::from_iter(vec![1, 2, 4, 5]), + HashSet::from_iter(vec![1]), + HashSet::from_iter(vec![2, 4, 5]), + ] + } + + #[test] + fn zero_limit() { + let cover = maximum_cover(example_system(), 0); + assert_eq!(cover.len(), 0); + } + + #[test] + fn one_limit() { + let sets = example_system(); + let cover = maximum_cover(sets.clone(), 1); + assert_eq!(cover.len(), 1); + assert_eq!(cover[0], sets[1]); + } + + // Check that even if the limit provides room, we don't include useless items in the soln. + #[test] + fn exclude_zero_score() { + let sets = example_system(); + for k in 2..10 { + let cover = maximum_cover(sets.clone(), k); + assert_eq!(cover.len(), 2); + assert_eq!(cover[0], sets[1]); + assert_eq!(cover[1], sets[0]); + } + } + + fn quality(solution: &[HashSet]) -> usize { + solution.iter().map(HashSet::len).sum() + } + + // Optimal solution is the first three sets (quality 15) but our greedy algorithm + // will select the last three (quality 11). The comment at the end of each line + // shows that set's score at each iteration, with a * indicating that it will be chosen. + #[test] + fn suboptimal() { + let sets = vec![ + HashSet::from_iter(vec![0, 1, 8, 11, 14]), // 5, 3, 2 + HashSet::from_iter(vec![2, 3, 7, 9, 10]), // 5, 3, 2 + HashSet::from_iter(vec![4, 5, 6, 12, 13]), // 5, 4, 2 + HashSet::from_iter(vec![9, 10]), // 4, 4, 2* + HashSet::from_iter(vec![5, 6, 7, 8]), // 4, 4* + HashSet::from_iter(vec![0, 1, 2, 3, 4]), // 5* + ]; + let cover = maximum_cover(sets.clone(), 3); + assert_eq!(quality(&cover), 11); + } + + #[test] + fn intersecting_ok() { + let sets = vec![ + HashSet::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8]), + HashSet::from_iter(vec![1, 2, 3, 9, 10, 11]), + HashSet::from_iter(vec![4, 5, 6, 12, 13, 14]), + HashSet::from_iter(vec![7, 8, 15, 16, 17, 18]), + HashSet::from_iter(vec![1, 2, 9, 10]), + HashSet::from_iter(vec![1, 5, 6, 8]), + HashSet::from_iter(vec![1, 7, 11, 19]), + ]; + let cover = maximum_cover(sets.clone(), 5); + assert_eq!(quality(&cover), 19); + assert_eq!(cover.len(), 5); + } +} diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs new file mode 100644 index 0000000000..aa6df597c8 --- /dev/null +++ b/eth2/operation_pool/src/persistence.rs @@ -0,0 +1,121 @@ +use crate::attestation_id::AttestationId; +use crate::OperationPool; +use parking_lot::RwLock; +use ssz_derive::{Decode, Encode}; +use types::*; + +/// SSZ-serializable version of `OperationPool`. +/// +/// Operations are stored in arbitrary order, so it's not a good idea to compare instances +/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first. +#[derive(Encode, Decode)] +pub struct PersistedOperationPool { + /// Mapping from attestation ID to attestation mappings. + // We could save space by not storing the attestation ID, but it might + // be difficult to make that roundtrip due to eager aggregation. + attestations: Vec<(AttestationId, Vec)>, + deposits: Vec, + /// Attester slashings. + attester_slashings: Vec, + /// Proposer slashings. + proposer_slashings: Vec, + /// Voluntary exits. + voluntary_exits: Vec, + /// Transfers. + transfers: Vec, +} + +impl PersistedOperationPool { + /// Convert an `OperationPool` into serializable form. + pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { + let attestations = operation_pool + .attestations + .read() + .iter() + .map(|(att_id, att)| (att_id.clone(), att.clone())) + .collect(); + + let deposits = operation_pool + .deposits + .read() + .iter() + .map(|(_, d)| d.clone()) + .collect(); + + let attester_slashings = operation_pool + .attester_slashings + .read() + .iter() + .map(|(_, slashing)| slashing.clone()) + .collect(); + + let proposer_slashings = operation_pool + .proposer_slashings + .read() + .iter() + .map(|(_, slashing)| slashing.clone()) + .collect(); + + let voluntary_exits = operation_pool + .voluntary_exits + .read() + .iter() + .map(|(_, exit)| exit.clone()) + .collect(); + + let transfers = operation_pool.transfers.read().iter().cloned().collect(); + + Self { + attestations, + deposits, + attester_slashings, + proposer_slashings, + voluntary_exits, + transfers, + } + } + + /// Reconstruct an `OperationPool`. + pub fn into_operation_pool( + self, + state: &BeaconState, + spec: &ChainSpec, + ) -> OperationPool { + let attestations = RwLock::new(self.attestations.into_iter().collect()); + let deposits = RwLock::new(self.deposits.into_iter().map(|d| (d.index, d)).collect()); + let attester_slashings = RwLock::new( + self.attester_slashings + .into_iter() + .map(|slashing| { + ( + OperationPool::attester_slashing_id(&slashing, state, spec), + slashing, + ) + }) + .collect(), + ); + let proposer_slashings = RwLock::new( + self.proposer_slashings + .into_iter() + .map(|slashing| (slashing.proposer_index, slashing)) + .collect(), + ); + let voluntary_exits = RwLock::new( + self.voluntary_exits + .into_iter() + .map(|exit| (exit.validator_index, exit)) + .collect(), + ); + let transfers = RwLock::new(self.transfers.into_iter().collect()); + + OperationPool { + attestations, + deposits, + attester_slashings, + proposer_slashings, + voluntary_exits, + transfers, + _phantom: Default::default(), + } + } +} diff --git a/eth2/state_processing/Cargo.toml b/eth2/state_processing/Cargo.toml index fa42671d93..e1f98260b9 100644 --- a/eth2/state_processing/Cargo.toml +++ b/eth2/state_processing/Cargo.toml @@ -24,12 +24,12 @@ integer-sqrt = "0.1" itertools = "0.8" log = "0.4" merkle_proof = { path = "../utils/merkle_proof" } -ssz = { path = "../utils/ssz" } -ssz_derive = { path = "../utils/ssz_derive" } +eth2_ssz = { path = "../utils/ssz" } +eth2_ssz_derive = { path = "../utils/ssz_derive" } tree_hash = { path = "../utils/tree_hash" } tree_hash_derive = { path = "../utils/tree_hash_derive" } types = { path = "../types" } rayon = "1.0" [features] -fake_crypto = ["bls/fake_crypto"] \ No newline at end of file +fake_crypto = ["bls/fake_crypto"] diff --git a/eth2/types/Cargo.toml b/eth2/types/Cargo.toml index fa1fe6a6d3..fd6578340a 100644 --- a/eth2/types/Cargo.toml +++ b/eth2/types/Cargo.toml @@ -26,13 +26,12 @@ serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.8" slog = "^2.2.3" -ssz = { path = "../utils/ssz" } -ssz_derive = { path = "../utils/ssz_derive" } +eth2_ssz = { path = "../utils/ssz" } +eth2_ssz_derive = { path = "../utils/ssz_derive" } swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" } test_random_derive = { path = "../utils/test_random_derive" } tree_hash = { path = "../utils/tree_hash" } tree_hash_derive = { path = "../utils/tree_hash_derive" } -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" } [dev-dependencies] env_logger = "0.6.0" diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index 74ce40671b..6073fb32e7 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -104,11 +104,7 @@ pub struct ChainSpec { domain_voluntary_exit: u32, domain_transfer: u32, - /* - * Network specific parameters - * - */ - pub boot_nodes: Vec, + pub boot_nodes: Vec, pub chain_id: u8, } @@ -216,7 +212,7 @@ impl ChainSpec { domain_transfer: 5, /* - * Boot nodes + * Network specific */ boot_nodes: vec![], chain_id: 1, // mainnet chain id @@ -231,12 +227,8 @@ impl ChainSpec { pub fn minimal() -> Self { let genesis_slot = Slot::new(0); - // Note: these bootnodes are placeholders. - // - // Should be updated once static bootnodes exist. - let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9000" - .parse() - .expect("correct multiaddr")]; + // Note: bootnodes to be updated when static nodes exist. + let boot_nodes = vec![]; Self { target_committee_size: 4, diff --git a/eth2/types/src/lib.rs b/eth2/types/src/lib.rs index 4d0ec5fae3..2406c3a18e 100644 --- a/eth2/types/src/lib.rs +++ b/eth2/types/src/lib.rs @@ -82,6 +82,3 @@ pub type ProposerMap = HashMap; pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature}; pub use fixed_len_vec::{typenum, typenum::Unsigned, FixedLenVec}; -pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; -pub use libp2p::multiaddr; -pub use libp2p::Multiaddr; diff --git a/eth2/utils/bls/Cargo.toml b/eth2/utils/bls/Cargo.toml index 4fb1246bea..1275894637 100644 --- a/eth2/utils/bls/Cargo.toml +++ b/eth2/utils/bls/Cargo.toml @@ -13,7 +13,7 @@ rand = "^0.5" serde = "1.0" serde_derive = "1.0" serde_hex = { path = "../serde_hex" } -ssz = { path = "../ssz" } +eth2_ssz = { path = "../ssz" } tree_hash = { path = "../tree_hash" } [features] diff --git a/eth2/utils/boolean-bitfield/Cargo.toml b/eth2/utils/boolean-bitfield/Cargo.toml index dfc97ce771..ceb04a55af 100644 --- a/eth2/utils/boolean-bitfield/Cargo.toml +++ b/eth2/utils/boolean-bitfield/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] cached_tree_hash = { path = "../cached_tree_hash" } serde_hex = { path = "../serde_hex" } -ssz = { path = "../ssz" } +eth2_ssz = { path = "../ssz" } bit-vec = "0.5.0" bit_reverse = "0.1" serde = "1.0" diff --git a/eth2/utils/boolean-bitfield/fuzz/Cargo.toml b/eth2/utils/boolean-bitfield/fuzz/Cargo.toml index 9769fc50ea..6a664ee600 100644 --- a/eth2/utils/boolean-bitfield/fuzz/Cargo.toml +++ b/eth2/utils/boolean-bitfield/fuzz/Cargo.toml @@ -9,7 +9,7 @@ publish = false cargo-fuzz = true [dependencies] -ssz = { path = "../../ssz" } +eth2_ssz = { path = "../../ssz" } [dependencies.boolean-bitfield] path = ".." diff --git a/eth2/utils/boolean-bitfield/src/lib.rs b/eth2/utils/boolean-bitfield/src/lib.rs index 08e56e7c3f..ac6ffa89a5 100644 --- a/eth2/utils/boolean-bitfield/src/lib.rs +++ b/eth2/utils/boolean-bitfield/src/lib.rs @@ -13,7 +13,7 @@ use std::default; /// A BooleanBitfield represents a set of booleans compactly stored as a vector of bits. /// The BooleanBitfield is given a fixed size during construction. Reads outside of the current size return an out-of-bounds error. Writes outside of the current size expand the size of the set. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash)] pub struct BooleanBitfield(BitVec); /// Error represents some reason a request against a bitfield was not satisfied @@ -170,6 +170,7 @@ impl cmp::PartialEq for BooleanBitfield { ssz::ssz_encode(self) == ssz::ssz_encode(other) } } +impl Eq for BooleanBitfield {} /// Create a new bitfield that is a union of two other bitfields. /// diff --git a/eth2/utils/eth2_config/src/lib.rs b/eth2/utils/eth2_config/src/lib.rs index 9d50a95c18..f6ad54c212 100644 --- a/eth2/utils/eth2_config/src/lib.rs +++ b/eth2/utils/eth2_config/src/lib.rs @@ -1,6 +1,5 @@ use clap::ArgMatches; use serde_derive::{Deserialize, Serialize}; -use std::fs; use std::fs::File; use std::io::prelude::*; use std::path::PathBuf; @@ -105,15 +104,3 @@ where Ok(None) } } - -pub fn get_data_dir(args: &ArgMatches, default_data_dir: PathBuf) -> Result { - if let Some(data_dir) = args.value_of("data_dir") { - Ok(PathBuf::from(data_dir)) - } else { - let path = dirs::home_dir() - .ok_or_else(|| "Unable to locate home directory")? - .join(&default_data_dir); - fs::create_dir_all(&path).map_err(|_| "Unable to create data_dir")?; - Ok(path) - } -} diff --git a/eth2/utils/fixed_len_vec/Cargo.toml b/eth2/utils/fixed_len_vec/Cargo.toml index ddfc331030..2750d3acda 100644 --- a/eth2/utils/fixed_len_vec/Cargo.toml +++ b/eth2/utils/fixed_len_vec/Cargo.toml @@ -9,5 +9,5 @@ cached_tree_hash = { path = "../cached_tree_hash" } tree_hash = { path = "../tree_hash" } serde = "1.0" serde_derive = "1.0" -ssz = { path = "../ssz" } +eth2_ssz = { path = "../ssz" } typenum = "1.10" diff --git a/eth2/utils/hashing/.cargo/config b/eth2/utils/hashing/.cargo/config new file mode 100644 index 0000000000..4ec2f3b862 --- /dev/null +++ b/eth2/utils/hashing/.cargo/config @@ -0,0 +1,2 @@ +[target.wasm32-unknown-unknown] +runner = 'wasm-bindgen-test-runner' diff --git a/eth2/utils/hashing/Cargo.toml b/eth2/utils/hashing/Cargo.toml index 78dd70e43c..506b84a6b7 100644 --- a/eth2/utils/hashing/Cargo.toml +++ b/eth2/utils/hashing/Cargo.toml @@ -4,5 +4,14 @@ version = "0.1.0" authors = ["Paul Hauner "] edition = "2018" -[dependencies] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] ring = "0.14.6" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +sha2 = "0.8.0" + +[dev-dependencies] +rustc-hex = "2.0.1" + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test = "0.2.47" diff --git a/eth2/utils/hashing/src/lib.rs b/eth2/utils/hashing/src/lib.rs index a9e286c39a..7214c7421a 100644 --- a/eth2/utils/hashing/src/lib.rs +++ b/eth2/utils/hashing/src/lib.rs @@ -1,7 +1,17 @@ +#[cfg(not(target_arch = "wasm32"))] use ring::digest::{digest, SHA256}; +#[cfg(target_arch = "wasm32")] +use sha2::{Digest, Sha256}; + pub fn hash(input: &[u8]) -> Vec { - digest(&SHA256, input).as_ref().into() + #[cfg(not(target_arch = "wasm32"))] + let h = digest(&SHA256, input).as_ref().into(); + + #[cfg(target_arch = "wasm32")] + let h = Sha256::digest(input).as_ref().into(); + + h } /// Get merkle root of some hashed values - the input leaf nodes is expected to already be hashed @@ -37,19 +47,24 @@ pub fn merkle_root(values: &[Vec]) -> Option> { #[cfg(test)] mod tests { use super::*; - use ring::test; + use rustc_hex::FromHex; - #[test] + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::*; + + #[cfg_attr(not(target_arch = "wasm32"), test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] fn test_hashing() { let input: Vec = b"hello world".as_ref().into(); let output = hash(input.as_ref()); let expected_hex = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"; - let expected: Vec = test::from_hex(expected_hex).unwrap(); + let expected: Vec = expected_hex.from_hex().unwrap(); assert_eq!(expected, output); } - #[test] + #[cfg_attr(not(target_arch = "wasm32"), test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] fn test_merkle_root() { // hash the leaf nodes let mut input = vec![ @@ -79,13 +94,17 @@ mod tests { assert_eq!(&expected[..], output.unwrap().as_slice()); } - #[test] + + #[cfg_attr(not(target_arch = "wasm32"), test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] fn test_empty_input_merkle_root() { let input = vec![]; let output = merkle_root(&input[..]); assert_eq!(None, output); } - #[test] + + #[cfg_attr(not(target_arch = "wasm32"), test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] fn test_odd_leaf_merkle_root() { let input = vec![ hash("a".as_bytes()), diff --git a/eth2/utils/ssz/Cargo.toml b/eth2/utils/ssz/Cargo.toml index 0423b1a8b4..5fd0072647 100644 --- a/eth2/utils/ssz/Cargo.toml +++ b/eth2/utils/ssz/Cargo.toml @@ -1,8 +1,12 @@ [package] -name = "ssz" +name = "eth2_ssz" version = "0.1.0" -authors = ["Paul Hauner "] +authors = ["Paul Hauner "] edition = "2018" +description = "SimpleSerialize (SSZ) as used in Ethereum 2.0" + +[lib] +name = "ssz" [[bench]] name = "benches" @@ -10,7 +14,7 @@ harness = false [dev-dependencies] criterion = "0.2" -ssz_derive = { path = "../ssz_derive" } +eth2_ssz_derive = { path = "../ssz_derive" } [dependencies] bytes = "0.4.9" diff --git a/eth2/utils/ssz/src/decode/impls.rs b/eth2/utils/ssz/src/decode/impls.rs index 0965ee3e54..6f79869459 100644 --- a/eth2/utils/ssz/src/decode/impls.rs +++ b/eth2/utils/ssz/src/decode/impls.rs @@ -34,8 +34,160 @@ impl_decodable_for_uint!(u8, 8); impl_decodable_for_uint!(u16, 16); impl_decodable_for_uint!(u32, 32); impl_decodable_for_uint!(u64, 64); + +#[cfg(target_pointer_width = "32")] +impl_decodable_for_uint!(usize, 32); + +#[cfg(target_pointer_width = "64")] impl_decodable_for_uint!(usize, 64); +macro_rules! impl_decode_for_tuples { + ($( + $Tuple:ident { + $(($idx:tt) -> $T:ident)+ + } + )+) => { + $( + impl<$($T: Decode),+> Decode for ($($T,)+) { + fn is_ssz_fixed_len() -> bool { + $( + <$T as Decode>::is_ssz_fixed_len() && + )* + true + } + + fn ssz_fixed_len() -> usize { + if ::is_ssz_fixed_len() { + $( + <$T as Decode>::ssz_fixed_len() + + )* + 0 + } else { + BYTES_PER_LENGTH_OFFSET + } + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + let mut builder = SszDecoderBuilder::new(bytes); + + $( + builder.register_type::<$T>()?; + )* + + let mut decoder = builder.build()?; + + Ok(($( + decoder.decode_next::<$T>()?, + )* + )) + } + } + )+ + } +} + +impl_decode_for_tuples! { + Tuple2 { + (0) -> A + (1) -> B + } + Tuple3 { + (0) -> A + (1) -> B + (2) -> C + } + Tuple4 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + } + Tuple5 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + } + Tuple6 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + } + Tuple7 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + } + Tuple8 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + } + Tuple9 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + } + Tuple10 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + } + Tuple11 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + (10) -> K + } + Tuple12 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + (10) -> K + (11) -> L + } +} + impl Decode for bool { fn is_ssz_fixed_len() -> bool { true @@ -515,4 +667,15 @@ mod tests { }) ); } + + #[test] + fn tuple() { + assert_eq!(<(u16, u16)>::from_ssz_bytes(&[0, 0, 0, 0]), Ok((0, 0))); + assert_eq!(<(u16, u16)>::from_ssz_bytes(&[16, 0, 17, 0]), Ok((16, 17))); + assert_eq!(<(u16, u16)>::from_ssz_bytes(&[0, 1, 2, 0]), Ok((256, 2))); + assert_eq!( + <(u16, u16)>::from_ssz_bytes(&[255, 255, 0, 0]), + Ok((65535, 0)) + ); + } } diff --git a/eth2/utils/ssz/src/encode/impls.rs b/eth2/utils/ssz/src/encode/impls.rs index 04492a1f2d..3d68d8911a 100644 --- a/eth2/utils/ssz/src/encode/impls.rs +++ b/eth2/utils/ssz/src/encode/impls.rs @@ -24,8 +24,161 @@ impl_encodable_for_uint!(u8, 8); impl_encodable_for_uint!(u16, 16); impl_encodable_for_uint!(u32, 32); impl_encodable_for_uint!(u64, 64); + +#[cfg(target_pointer_width = "32")] +impl_encodable_for_uint!(usize, 32); + +#[cfg(target_pointer_width = "64")] impl_encodable_for_uint!(usize, 64); +// Based on the `tuple_impls` macro from the standard library. +macro_rules! impl_encode_for_tuples { + ($( + $Tuple:ident { + $(($idx:tt) -> $T:ident)+ + } + )+) => { + $( + impl<$($T: Encode),+> Encode for ($($T,)+) { + fn is_ssz_fixed_len() -> bool { + $( + <$T as Encode>::is_ssz_fixed_len() && + )* + true + } + + fn ssz_fixed_len() -> usize { + if ::is_ssz_fixed_len() { + $( + <$T as Encode>::ssz_fixed_len() + + )* + 0 + } else { + BYTES_PER_LENGTH_OFFSET + } + } + + fn ssz_append(&self, buf: &mut Vec) { + let offset = $( + <$T as Encode>::ssz_fixed_len() + + )* + 0; + + let mut encoder = SszEncoder::container(buf, offset); + + $( + encoder.append(&self.$idx); + )* + + encoder.finalize(); + } + } + )+ + } +} + +impl_encode_for_tuples! { + Tuple2 { + (0) -> A + (1) -> B + } + Tuple3 { + (0) -> A + (1) -> B + (2) -> C + } + Tuple4 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + } + Tuple5 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + } + Tuple6 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + } + Tuple7 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + } + Tuple8 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + } + Tuple9 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + } + Tuple10 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + } + Tuple11 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + (10) -> K + } + Tuple12 { + (0) -> A + (1) -> B + (2) -> C + (3) -> D + (4) -> E + (5) -> F + (6) -> G + (7) -> H + (8) -> I + (9) -> J + (10) -> K + (11) -> L + } +} + /// The SSZ "union" type. impl Encode for Option { fn is_ssz_fixed_len() -> bool { @@ -287,4 +440,11 @@ mod tests { assert_eq!([1, 0, 0, 0].as_ssz_bytes(), vec![1, 0, 0, 0]); assert_eq!([1, 2, 3, 4].as_ssz_bytes(), vec![1, 2, 3, 4]); } + + #[test] + fn tuple() { + assert_eq!((10u8, 11u8).as_ssz_bytes(), vec![10, 11]); + assert_eq!((10u32, 11u8).as_ssz_bytes(), vec![10, 0, 0, 0, 11]); + assert_eq!((10u8, 11u8, 12u8).as_ssz_bytes(), vec![10, 11, 12]); + } } diff --git a/eth2/utils/ssz/src/lib.rs b/eth2/utils/ssz/src/lib.rs index fceebcc444..bcb9f525c5 100644 --- a/eth2/utils/ssz/src/lib.rs +++ b/eth2/utils/ssz/src/lib.rs @@ -2,7 +2,7 @@ //! format designed for use in Ethereum 2.0. //! //! Conforms to -//! [v0.6.1](https://github.com/ethereum/eth2.0-specs/blob/v0.6.1/specs/simple-serialize.md) of the +//! [v0.7.1](https://github.com/ethereum/eth2.0-specs/blob/v0.7.1/specs/simple-serialize.md) of the //! Ethereum 2.0 specification. //! //! ## Example @@ -46,7 +46,10 @@ pub use encode::{Encode, SszEncoder}; /// The number of bytes used to represent an offset. pub const BYTES_PER_LENGTH_OFFSET: usize = 4; /// The maximum value that can be represented using `BYTES_PER_LENGTH_OFFSET`. -pub const MAX_LENGTH_VALUE: usize = (1 << (BYTES_PER_LENGTH_OFFSET * 8)) - 1; +#[cfg(target_pointer_width = "32")] +pub const MAX_LENGTH_VALUE: usize = (std::u32::MAX >> 8 * (4 - BYTES_PER_LENGTH_OFFSET)) as usize; +#[cfg(target_pointer_width = "64")] +pub const MAX_LENGTH_VALUE: usize = (std::u64::MAX >> 8 * (8 - BYTES_PER_LENGTH_OFFSET)) as usize; /// Convenience function to SSZ encode an object supporting ssz::Encode. /// diff --git a/eth2/utils/ssz/tests/tests.rs b/eth2/utils/ssz/tests/tests.rs index 9447cf5372..c19e366622 100644 --- a/eth2/utils/ssz/tests/tests.rs +++ b/eth2/utils/ssz/tests/tests.rs @@ -346,4 +346,34 @@ mod round_trip { round_trip(vec); } + + #[test] + fn tuple_u8_u16() { + let vec: Vec<(u8, u16)> = vec![ + (0, 0), + (0, 1), + (1, 0), + (u8::max_value(), u16::max_value()), + (0, u16::max_value()), + (u8::max_value(), 0), + (42, 12301), + ]; + + round_trip(vec); + } + + #[test] + fn tuple_vec_vec() { + let vec: Vec<(u64, Vec, Vec>)> = vec![ + (0, vec![], vec![vec![]]), + (99, vec![101], vec![vec![], vec![]]), + ( + 42, + vec![12, 13, 14], + vec![vec![99, 98, 97, 96], vec![42, 44, 46, 48, 50]], + ), + ]; + + round_trip(vec); + } } diff --git a/eth2/utils/ssz_derive/Cargo.toml b/eth2/utils/ssz_derive/Cargo.toml index 3e58d752b8..0b76f6153a 100644 --- a/eth2/utils/ssz_derive/Cargo.toml +++ b/eth2/utils/ssz_derive/Cargo.toml @@ -1,14 +1,15 @@ [package] -name = "ssz_derive" +name = "eth2_ssz_derive" version = "0.1.0" -authors = ["Paul Hauner "] +authors = ["Paul Hauner "] edition = "2018" -description = "Procedural derive macros for SSZ encoding and decoding." +description = "Procedural derive macros to accompany the eth2_ssz crate." [lib] +name = "ssz_derive" proc-macro = true [dependencies] syn = "0.15" quote = "0.6" -ssz = { path = "../ssz" } +eth2_ssz = { path = "../ssz" } diff --git a/eth2/validator_change/Cargo.toml b/eth2/validator_change/Cargo.toml index a1c499340e..725799612f 100644 --- a/eth2/validator_change/Cargo.toml +++ b/eth2/validator_change/Cargo.toml @@ -7,5 +7,5 @@ edition = "2018" [dependencies] bytes = "0.4.10" hashing = { path = "../utils/hashing" } -ssz = { path = "../utils/ssz" } +eth2_ssz = { path = "../utils/ssz" } types = { path = "../types" } diff --git a/tests/ef_tests/Cargo.toml b/tests/ef_tests/Cargo.toml index e199f4cb6d..e8d6b0f2f6 100644 --- a/tests/ef_tests/Cargo.toml +++ b/tests/ef_tests/Cargo.toml @@ -17,7 +17,7 @@ serde = "1.0" serde_derive = "1.0" serde_repr = "0.1" serde_yaml = "0.8" -ssz = { path = "../../eth2/utils/ssz" } +eth2_ssz = { path = "../../eth2/utils/ssz" } tree_hash = { path = "../../eth2/utils/tree_hash" } cached_tree_hash = { path = "../../eth2/utils/cached_tree_hash" } state_processing = { path = "../../eth2/state_processing" } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 1784bdcb1e..1972f870cd 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -14,7 +14,7 @@ path = "src/lib.rs" [dependencies] bls = { path = "../eth2/utils/bls" } -ssz = { path = "../eth2/utils/ssz" } +eth2_ssz = { path = "../eth2/utils/ssz" } eth2_config = { path = "../eth2/utils/eth2_config" } tree_hash = { path = "../eth2/utils/tree_hash" } clap = "2.32.0" @@ -34,3 +34,4 @@ toml = "^0.5" error-chain = "0.12.0" bincode = "^1.1.2" futures = "0.1.25" +dirs = "2.0.1" diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index f749154385..5beea4c38c 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -9,9 +9,10 @@ mod signer; use crate::config::Config as ValidatorClientConfig; use crate::service::Service as ValidatorService; use clap::{App, Arg}; -use eth2_config::{get_data_dir, read_from_file, write_to_file, Eth2Config}; +use eth2_config::{read_from_file, write_to_file, Eth2Config}; use protos::services_grpc::ValidatorServiceClient; use slog::{crit, error, info, o, Drain}; +use std::fs; use std::path::PathBuf; use types::{Keypair, MainnetEthSpec, MinimalEthSpec}; @@ -35,6 +36,7 @@ fn main() { .arg( Arg::with_name("datadir") .long("datadir") + .short("d") .value_name("DIR") .help("Data directory for keys and databases.") .takes_value(true), @@ -66,14 +68,34 @@ fn main() { ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(log, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); // Attempt to lead the `ClientConfig` from disk.