diff --git a/Cargo.toml b/Cargo.toml index d149030b66..cb070cc2da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,11 @@ members = [ "eth2/utils/test_random_derive", "beacon_node", "beacon_node/db", + "beacon_node/client", + "beacon_node/network", + "beacon_node/eth2-libp2p", + "beacon_node/rpc", + "beacon_node/version", "beacon_node/beacon_chain", "beacon_node/beacon_chain/test_harness", "protos", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index b76bc3e82f..e7aaf938de 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,25 +1,19 @@ [package] name = "beacon_node" version = "0.1.0" -authors = ["Paul Hauner "] +authors = ["Paul Hauner ", "Age Manning "] +authors = ["Paul Hauner ", "Age Manning "] edition = "2018" [dependencies] diff --git a/beacon_node/beacon_chain/src/initialise.rs b/beacon_node/beacon_chain/src/initialise.rs new file mode 100644 index 0000000000..7d3c87965f --- /dev/null +++ b/beacon_node/beacon_chain/src/initialise.rs @@ -0,0 +1,94 @@ +// Initialisation functions to generate a new BeaconChain. +// Note: A new version of ClientTypes may need to be implemented for the lighthouse +// testnet. These are examples. Also. there is code duplication which can/should be cleaned up. + +use crate::BeaconChain; +use db::stores::{BeaconBlockStore, BeaconStateStore}; +use db::{DiskDB, MemoryDB}; +use fork_choice::BitwiseLMDGhost; +use slot_clock::SystemTimeSlotClock; +use ssz::TreeHash; +use std::path::PathBuf; +use std::sync::Arc; +use types::test_utils::TestingBeaconStateBuilder; +use types::{BeaconBlock, ChainSpec, Hash256}; + +//TODO: Correct this for prod +//TODO: Account for historical db +pub fn initialise_beacon_chain( + spec: &ChainSpec, + db_name: Option<&PathBuf>, +) -> Arc>> { + // set up the db + let db = Arc::new(DiskDB::open( + db_name.expect("Database directory must be included"), + None, + )); + + let block_store = Arc::new(BeaconBlockStore::new(db.clone())); + let state_store = Arc::new(BeaconStateStore::new(db.clone())); + + let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, &spec); + let (genesis_state, _keypairs) = state_builder.build(); + + let mut genesis_block = BeaconBlock::empty(&spec); + genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); + + // Slot clock + let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot) + .expect("Unable to load SystemTimeSlotClock"); + // Choose the fork choice + let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + + // Genesis chain + //TODO: Handle error correctly + Arc::new( + BeaconChain::from_genesis( + state_store.clone(), + block_store.clone(), + slot_clock, + genesis_state, + genesis_block, + spec.clone(), + fork_choice, + ) + .expect("Terminate if beacon chain generation fails"), + ) +} + +/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. +pub fn initialise_test_beacon_chain( + spec: &ChainSpec, + _db_name: Option<&PathBuf>, +) -> Arc>> { + let db = Arc::new(MemoryDB::open()); + let block_store = Arc::new(BeaconBlockStore::new(db.clone())); + let state_store = Arc::new(BeaconStateStore::new(db.clone())); + + let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, spec); + let (genesis_state, _keypairs) = state_builder.build(); + + let mut genesis_block = BeaconBlock::empty(spec); + genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); + + // Slot clock + let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot) + .expect("Unable to load SystemTimeSlotClock"); + // Choose the fork choice + let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + + // Genesis chain + //TODO: Handle error correctly + Arc::new( + BeaconChain::from_genesis( + state_store.clone(), + block_store.clone(), + slot_clock, + genesis_state, + genesis_block, + spec.clone(), + fork_choice, + ) + .expect("Terminate if beacon chain generation fails"), + ) +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 0e879a4151..2137c0edfd 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,8 +2,13 @@ mod attestation_aggregator; mod beacon_chain; mod checkpoint; mod errors; +pub mod initialise; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; pub use self::errors::BeaconChainError; -pub use fork_choice::{ForkChoice, ForkChoiceAlgorithm, ForkChoiceError}; +pub use db; +pub use fork_choice; +pub use parking_lot; +pub use slot_clock; +pub use types; diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml new file mode 100644 index 0000000000..12c1b5c802 --- /dev/null +++ b/beacon_node/client/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "client" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +beacon_chain = { path = "../beacon_chain" } +network = { path = "../network" } +db = { path = "../db" } +rpc = { path = "../rpc" } +fork_choice = { path = "../../eth2/fork_choice" } +types = { path = "../../eth2/types" } +slot_clock = { path = "../../eth2/utils/slot_clock" } +error-chain = "0.12.0" +slog = "^2.2.3" +tokio = "0.1.15" +clap = "2.32.0" +dirs = "1.0.3" +exit-future = "0.1.3" +futures = "0.1.25" diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/client_config.rs new file mode 100644 index 0000000000..cad287f2cc --- /dev/null +++ b/beacon_node/client/src/client_config.rs @@ -0,0 +1,124 @@ +use clap::ArgMatches; +use db::DBType; +use fork_choice::ForkChoiceAlgorithm; +use network::NetworkConfig; +use slog::error; +use std::fs; +use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr}; +use std::path::PathBuf; +use types::multiaddr::Protocol; +use types::multiaddr::ToMultiaddr; +use types::ChainSpec; + +/// Stores the client configuration for this Lighthouse instance. +#[derive(Debug, Clone)] +pub struct ClientConfig { + pub data_dir: PathBuf, + pub spec: ChainSpec, + pub net_conf: network::NetworkConfig, + pub fork_choice: ForkChoiceAlgorithm, + pub db_type: DBType, + pub db_name: PathBuf, + pub rpc_conf: rpc::RPCConfig, + //pub ipc_conf: +} + +impl Default for ClientConfig { + /// Build a new lighthouse configuration from defaults. + fn default() -> Self { + let data_dir = { + let home = dirs::home_dir().expect("Unable to determine home dir."); + home.join(".lighthouse/") + }; + fs::create_dir_all(&data_dir) + .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); + + let default_spec = ChainSpec::lighthouse_testnet(); + let default_net_conf = NetworkConfig::new(default_spec.boot_nodes.clone()); + + Self { + data_dir: data_dir.clone(), + // default to foundation for chain specs + spec: default_spec, + net_conf: default_net_conf, + // default to bitwise LMD Ghost + fork_choice: ForkChoiceAlgorithm::BitwiseLMDGhost, + // default to memory db for now + db_type: DBType::Memory, + // default db name for disk-based dbs + db_name: data_dir.join("chain.db"), + rpc_conf: rpc::RPCConfig::default(), + } + } +} + +impl ClientConfig { + /// Parses the CLI arguments into a `Config` struct. + pub fn parse_args(args: ArgMatches, log: &slog::Logger) -> Result { + let mut config = ClientConfig::default(); + + /* Network related arguments */ + + // Custom p2p listen port + if let Some(port_str) = args.value_of("port") { + if let Ok(port) = port_str.parse::() { + config.net_conf.listen_port = port; + // update the listening multiaddrs + for address in &mut config.net_conf.listen_addresses { + address.pop(); + address.append(Protocol::Tcp(port)); + } + } else { + error!(log, "Invalid port"; "port" => port_str); + return Err("Invalid port"); + } + } + // Custom listening address ipv4/ipv6 + // TODO: Handle list of addresses + if let Some(listen_address_str) = args.value_of("listen_address") { + if let Ok(listen_address) = listen_address_str.parse::() { + let multiaddr = SocketAddr::new(listen_address, config.net_conf.listen_port) + .to_multiaddr() + .expect("Invalid listen address format"); + config.net_conf.listen_addresses = vec![multiaddr]; + } else { + error!(log, "Invalid IP Address"; "Address" => listen_address_str); + return Err("Invalid IP Address"); + } + } + + /* Filesystem related arguments */ + + // Custom datadir + if let Some(dir) = args.value_of("datadir") { + config.data_dir = PathBuf::from(dir.to_string()); + }; + + /* RPC related arguments */ + + if args.is_present("rpc") { + config.rpc_conf.enabled = true; + } + + if let Some(rpc_address) = args.value_of("rpc-address") { + if let Ok(listen_address) = rpc_address.parse::() { + config.rpc_conf.listen_address = listen_address; + } else { + error!(log, "Invalid RPC listen address"; "Address" => rpc_address); + return Err("Invalid RPC listen address"); + } + } + + if let Some(rpc_port) = args.value_of("rpc-port") { + if let Ok(port) = rpc_port.parse::() { + config.rpc_conf.port = port; + } else { + error!(log, "Invalid RPC port"; "port" => rpc_port); + return Err("Invalid RPC port"); + } + } + + Ok(config) + } +} diff --git a/beacon_node/client/src/client_types.rs b/beacon_node/client/src/client_types.rs new file mode 100644 index 0000000000..f5abc77ced --- /dev/null +++ b/beacon_node/client/src/client_types.rs @@ -0,0 +1,49 @@ +use crate::ClientConfig; +use beacon_chain::{ + db::{ClientDB, DiskDB, MemoryDB}, + fork_choice::BitwiseLMDGhost, + initialise, + slot_clock::{SlotClock, SystemTimeSlotClock}, + BeaconChain, +}; +use fork_choice::ForkChoice; + +use std::sync::Arc; + +pub trait ClientTypes { + type DB: ClientDB + 'static; + type SlotClock: SlotClock + 'static; + type ForkChoice: ForkChoice + 'static; + + fn initialise_beacon_chain( + config: &ClientConfig, + ) -> Arc>; +} + +pub struct StandardClientType; + +impl ClientTypes for StandardClientType { + type DB = DiskDB; + type SlotClock = SystemTimeSlotClock; + type ForkChoice = BitwiseLMDGhost; + + fn initialise_beacon_chain( + config: &ClientConfig, + ) -> Arc> { + initialise::initialise_beacon_chain(&config.spec, Some(&config.db_name)) + } +} + +pub struct TestingClientType; + +impl ClientTypes for TestingClientType { + type DB = MemoryDB; + type SlotClock = SystemTimeSlotClock; + type ForkChoice = BitwiseLMDGhost; + + fn initialise_beacon_chain( + config: &ClientConfig, + ) -> Arc> { + initialise::initialise_test_beacon_chain(&config.spec, None) + } +} diff --git a/beacon_node/client/src/error.rs b/beacon_node/client/src/error.rs new file mode 100644 index 0000000000..618813826b --- /dev/null +++ b/beacon_node/client/src/error.rs @@ -0,0 +1,14 @@ +// generates error types +use network; + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! { + links { + Network(network::error::Error, network::error::ErrorKind); + } + +} diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs new file mode 100644 index 0000000000..914e47fcf4 --- /dev/null +++ b/beacon_node/client/src/lib.rs @@ -0,0 +1,77 @@ +extern crate slog; + +mod client_config; +pub mod client_types; +pub mod error; +pub mod notifier; + +use beacon_chain::BeaconChain; +pub use client_config::ClientConfig; +pub use client_types::ClientTypes; +use exit_future::Signal; +use network::Service as NetworkService; +use slog::o; +use std::marker::PhantomData; +use std::sync::Arc; +use tokio::runtime::TaskExecutor; + +/// Main beacon node client service. This provides the connection and initialisation of the clients +/// sub-services in multiple threads. +pub struct Client { + /// Configuration for the lighthouse client. + config: ClientConfig, + /// The beacon chain for the running client. + beacon_chain: Arc>, + /// Reference to the network service. + pub network: Arc, + /// Future to stop and begin shutdown of the Client. + //TODO: Decide best way to handle shutdown + pub exit: exit_future::Exit, + /// The sending future to call to terminate the Client. + //TODO: Decide best way to handle shutdown + pub exit_signal: Signal, + /// The clients logger. + log: slog::Logger, + /// Marker to pin the beacon chain generics. + phantom: PhantomData, +} + +impl Client { + /// Generate an instance of the client. Spawn and link all internal sub-processes. + pub fn new( + config: ClientConfig, + log: slog::Logger, + executor: &TaskExecutor, + ) -> error::Result { + let (exit_signal, exit) = exit_future::signal(); + + // generate a beacon chain + let beacon_chain = TClientType::initialise_beacon_chain(&config); + + // Start the network service, libp2p and syncing threads + // TODO: Add beacon_chain reference to network parameters + let network_config = &config.net_conf; + let network_logger = log.new(o!("Service" => "Network")); + let (network, _network_send) = NetworkService::new( + beacon_chain.clone(), + network_config, + executor, + network_logger, + )?; + + // spawn the RPC server + if config.rpc_conf.enabled { + rpc::start_server(&config.rpc_conf, &log); + } + + Ok(Client { + config, + beacon_chain, + exit, + exit_signal, + log, + network, + phantom: PhantomData, + }) + } +} diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs new file mode 100644 index 0000000000..335183c7de --- /dev/null +++ b/beacon_node/client/src/notifier.rs @@ -0,0 +1,45 @@ +use crate::Client; +use crate::ClientTypes; +use exit_future::Exit; +use futures::{Future, Stream}; +use slog::{debug, info, o}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::runtime::TaskExecutor; +use tokio::timer::Interval; + +/// Thread that monitors the client and reports useful statistics to the user. + +pub fn run(client: &Client, executor: TaskExecutor, exit: Exit) { + // notification heartbeat + let interval = Interval::new(Instant::now(), Duration::from_secs(5)); + + let log = client.log.new(o!("Service" => "Notifier")); + + // TODO: Debugging only + let counter = Arc::new(Mutex::new(0)); + let network = client.network.clone(); + + // build heartbeat logic here + let heartbeat = move |_| { + info!(log, "Temp heartbeat output"); + //TODO: Remove this logic. Testing only + let mut count = counter.lock().unwrap(); + *count += 1; + + if *count % 5 == 0 { + debug!(log, "Sending Message"); + network.send_message(); + } + + Ok(()) + }; + + // map error and spawn + let log = client.log.clone(); + let heartbeat_interval = interval + .map_err(move |e| debug!(log, "Timer error {}", e)) + .for_each(heartbeat); + + executor.spawn(exit.until(heartbeat_interval).map(|_| ())); +} diff --git a/beacon_node/db/src/lib.rs b/beacon_node/db/src/lib.rs index a646d7d2ee..5e710ae9a1 100644 --- a/beacon_node/db/src/lib.rs +++ b/beacon_node/db/src/lib.rs @@ -12,3 +12,10 @@ use self::stores::COLUMNS; pub use self::disk_db::DiskDB; pub use self::memory_db::MemoryDB; pub use self::traits::{ClientDB, DBError, DBValue}; + +/// Currently available database options +#[derive(Debug, Clone)] +pub enum DBType { + Memory, + RocksDB, +} diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml new file mode 100644 index 0000000000..4dd2e9c7b4 --- /dev/null +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "eth2-libp2p" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +# SigP repository until PR is merged +libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" } +types = { path = "../../eth2/types" } +ssz = { path = "../../eth2/utils/ssz" } +ssz_derive = { path = "../../eth2/utils/ssz_derive" } +slog = "2.4.1" +version = { path = "../version" } +tokio = "0.1.16" +futures = "0.1.25" +error-chain = "0.12.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs new file mode 100644 index 0000000000..78d0130025 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -0,0 +1,96 @@ +use crate::rpc::{RPCEvent, RPCMessage, Rpc}; +use futures::prelude::*; +use libp2p::{ + core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, + gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent}, + tokio_io::{AsyncRead, AsyncWrite}, + NetworkBehaviour, PeerId, +}; +use types::Topic; + +/// Builds the network behaviour for the libp2p Swarm. +/// Implements gossipsub message routing. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] +pub struct Behaviour { + gossipsub: Gossipsub, + // TODO: Add Kademlia for peer discovery + /// The events generated by this behaviour to be consumed in the swarm poll. + serenity_rpc: Rpc, + #[behaviour(ignore)] + events: Vec, +} + +// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: GossipsubEvent) { + match event { + GossipsubEvent::Message(message) => { + let gs_message = String::from_utf8_lossy(&message.data); + // TODO: Remove this type - debug only + self.events + .push(BehaviourEvent::Message(gs_message.to_string())) + } + _ => {} + } + } +} + +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: RPCMessage) { + match event { + RPCMessage::PeerDialed(peer_id) => { + self.events.push(BehaviourEvent::PeerDialed(peer_id)) + } + RPCMessage::RPC(peer_id, rpc_event) => { + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) + } + } + } +} + +impl Behaviour { + pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self { + Behaviour { + gossipsub: Gossipsub::new(local_peer_id, gs_config), + serenity_rpc: Rpc::new(log), + events: Vec::new(), + } + } + + /// Consumes the events list when polled. + fn poll( + &mut self, + ) -> Async> { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + + Async::NotReady + } +} + +/// Implements the combined behaviour for the libp2p service. +impl Behaviour { + /// Subscribes to a gossipsub topic. + pub fn subscribe(&mut self, topic: Topic) -> bool { + self.gossipsub.subscribe(topic) + } + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); + } +} + +/// The types of events than can be obtained from polling the behaviour. +pub enum BehaviourEvent { + RPC(PeerId, RPCEvent), + PeerDialed(PeerId), + // TODO: This is a stub at the moment + Message(String), +} diff --git a/beacon_node/eth2-libp2p/src/error.rs b/beacon_node/eth2-libp2p/src/error.rs new file mode 100644 index 0000000000..163fe575d2 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/error.rs @@ -0,0 +1,8 @@ +// generates error types + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! {} diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs new file mode 100644 index 0000000000..f3e97355d7 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -0,0 +1,20 @@ +/// This crate contains the main link for lighthouse to rust-libp2p. It therefore re-exports +/// all required libp2p functionality. +/// +/// This crate builds and manages the libp2p services required by the beacon node. +pub mod behaviour; +pub mod error; +mod network_config; +pub mod rpc; +mod service; + +pub use libp2p::{ + gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, + PeerId, +}; +pub use network_config::NetworkConfig; +pub use rpc::{HelloMessage, RPCEvent}; +pub use service::Libp2pEvent; +pub use service::Service; +pub use types::multiaddr; +pub use types::Multiaddr; diff --git a/beacon_node/eth2-libp2p/src/network_config.rs b/beacon_node/eth2-libp2p/src/network_config.rs new file mode 100644 index 0000000000..176892bb02 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/network_config.rs @@ -0,0 +1,59 @@ +use crate::Multiaddr; +use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; +use libp2p::secio; +use std::fmt; + +#[derive(Clone)] +/// Network configuration for lighthouse. +pub struct NetworkConfig { + //TODO: stubbing networking initial params, change in the future + /// IP address to listen on. + pub listen_addresses: Vec, + /// Listen port UDP/TCP. + pub listen_port: u16, + /// Gossipsub configuration parameters. + pub gs_config: GossipsubConfig, + /// List of nodes to initially connect to. + pub boot_nodes: Vec, + /// Peer key related to this nodes PeerId. + pub local_private_key: secio::SecioKeyPair, + /// Client version + pub client_version: String, + /// List of topics to subscribe to as strings + pub topics: Vec, +} + +impl Default for NetworkConfig { + /// Generate a default network configuration. + fn default() -> Self { + // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this + // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 + + NetworkConfig { + listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000" + .parse() + .expect("is a correct multi-address")], + listen_port: 9000, + gs_config: GossipsubConfigBuilder::new().build(), + boot_nodes: Vec::new(), + local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(), + client_version: version::version(), + topics: vec![String::from("beacon_chain")], + } + } +} + +impl NetworkConfig { + pub fn new(boot_nodes: Vec) -> Self { + let mut conf = NetworkConfig::default(); + conf.boot_nodes = boot_nodes; + + conf + } +} + +impl fmt::Debug for NetworkConfig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: , client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version) + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs new file mode 100644 index 0000000000..3014afd0ff --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -0,0 +1,161 @@ +/// Available RPC methods types and ids. +use ssz_derive::{Decode, Encode}; +use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; + +#[derive(Debug)] +/// Available Serenity Libp2p RPC methods +pub enum RPCMethod { + /// Initialise handshake between connecting peers. + Hello, + /// Terminate a connection providing a reason. + Goodbye, + /// Requests a number of beacon block roots. + BeaconBlockRoots, + /// Requests a number of beacon block headers. + BeaconBlockHeaders, + /// Requests a number of beacon block bodies. + BeaconBlockBodies, + /// Requests values for a merkle proof for the current blocks state root. + BeaconChainState, // Note: experimental, not complete. + /// Unknown method received. + Unknown, +} + +impl From for RPCMethod { + fn from(method_id: u16) -> Self { + match method_id { + 0 => RPCMethod::Hello, + 1 => RPCMethod::Goodbye, + 10 => RPCMethod::BeaconBlockRoots, + 11 => RPCMethod::BeaconBlockHeaders, + 12 => RPCMethod::BeaconBlockBodies, + 13 => RPCMethod::BeaconChainState, + + _ => RPCMethod::Unknown, + } + } +} + +impl Into for RPCMethod { + fn into(self) -> u16 { + match self { + RPCMethod::Hello => 0, + RPCMethod::Goodbye => 1, + RPCMethod::BeaconBlockRoots => 10, + RPCMethod::BeaconBlockHeaders => 11, + RPCMethod::BeaconBlockBodies => 12, + RPCMethod::BeaconChainState => 13, + _ => 0, + } + } +} + +#[derive(Debug, Clone)] +pub enum RPCRequest { + Hello(HelloMessage), + Goodbye(u64), + BeaconBlockRoots(BeaconBlockRootsRequest), + BeaconBlockHeaders(BeaconBlockHeadersRequest), + BeaconBlockBodies(BeaconBlockBodiesRequest), + BeaconChainState(BeaconChainStateRequest), +} + +#[derive(Debug, Clone)] +pub enum RPCResponse { + Hello(HelloMessage), + BeaconBlockRoots(BeaconBlockRootsResponse), + BeaconBlockHeaders(BeaconBlockHeadersResponse), + BeaconBlockBodies(BeaconBlockBodiesResponse), + BeaconChainState(BeaconChainStateResponse), +} + +/* Request/Response data structures for RPC methods */ + +/// The HELLO request/response handshake message. +#[derive(Encode, Decode, Clone, Debug)] +pub struct HelloMessage { + /// The network ID of the peer. + pub network_id: u8, + /// The peers last finalized root. + pub latest_finalized_root: Hash256, + /// The peers last finalized epoch. + pub latest_finalized_epoch: Epoch, + /// The peers last block root. + pub best_root: Hash256, + /// The peers last slot. + pub best_slot: Slot, +} + +/// Request a number of beacon block roots from a peer. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockRootsRequest { + /// The starting slot of the requested blocks. + start_slot: Slot, + /// The number of blocks from the start slot. + count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers +} + +/// Response containing a number of beacon block roots from a peer. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockRootsResponse { + /// List of requested blocks and associated slots. + roots: Vec, +} + +/// Contains a block root and associated slot. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BlockRootSlot { + /// The block root. + block_root: Hash256, + /// The block slot. + slot: Slot, +} + +/// Request a number of beacon block headers from a peer. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockHeadersRequest { + /// The starting header hash of the requested headers. + start_root: Hash256, + /// The starting slot of the requested headers. + start_slot: Slot, + /// The maximum number of headers than can be returned. + max_headers: u64, + /// The maximum number of slots to skip between blocks. + skip_slots: u64, +} + +/// Response containing requested block headers. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockHeadersResponse { + /// The list of requested beacon block headers. + headers: Vec, +} + +/// Request a number of beacon block bodies from a peer. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockBodiesRequest { + /// The list of beacon block bodies being requested. + block_roots: Hash256, +} + +/// Response containing the list of requested beacon block bodies. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconBlockBodiesResponse { + /// The list of beacon block bodies being requested. + block_bodies: Vec, +} + +/// Request values for tree hashes which yield a blocks `state_root`. +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconChainStateRequest { + /// The tree hashes that a value is requested for. + hashes: Vec, +} + +/// Request values for tree hashes which yield a blocks `state_root`. +// Note: TBD +#[derive(Encode, Decode, Clone, Debug)] +pub struct BeaconChainStateResponse { + /// The values corresponding the to the requested tree hashes. + values: bool, //TBD - stubbed with encodeable bool +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs new file mode 100644 index 0000000000..a1cfadafe6 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -0,0 +1,138 @@ +/// RPC Protocol over libp2p. +/// +/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on +/// `/eth/serenity/rpc/1.0.0` +mod methods; +mod protocol; + +use futures::prelude::*; +use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler}; +use libp2p::core::swarm::{ + ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::{Multiaddr, PeerId}; +pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +pub use protocol::{RPCEvent, RPCProtocol}; +use slog::o; +use std::marker::PhantomData; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0 +/// specification. + +pub struct Rpc { + /// Queue of events to processed. + events: Vec>, + /// Pins the generic substream. + marker: PhantomData, + /// Slog logger for RPC behaviour. + log: slog::Logger, +} + +impl Rpc { + pub fn new(log: &slog::Logger) -> Self { + let log = log.new(o!("Service" => "Libp2p-RPC")); + Rpc { + events: Vec::new(), + marker: PhantomData, + log, + } + } + + /// Submits and RPC request. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.events.push(NetworkBehaviourAction::SendEvent { + peer_id, + event: rpc_event, + }); + } +} + +impl NetworkBehaviour for Rpc +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = OneShotHandler; + type OutEvent = RPCMessage; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + Default::default() + } + + fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { + // if initialised the connection, report this upwards to send the HELLO request + if let ConnectedPoint::Dialer { address: _ } = connected_point { + self.events.push(NetworkBehaviourAction::GenerateEvent( + RPCMessage::PeerDialed(peer_id), + )); + } + } + + fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + + fn inject_node_event( + &mut self, + source: PeerId, + event: ::OutEvent, + ) { + // ignore successful send events + let event = match event { + OneShotEvent::Rx(event) => event, + OneShotEvent::Sent => return, + }; + + // send the event to the user + self.events + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC( + source, event, + ))); + } + + fn poll( + &mut self, + _: &mut PollParameters<'_>, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if !self.events.is_empty() { + return Async::Ready(self.events.remove(0)); + } + Async::NotReady + } +} + +/// Messages sent to the user from the RPC protocol. +pub enum RPCMessage { + RPC(PeerId, RPCEvent), + PeerDialed(PeerId), +} + +/// Transmission between the `OneShotHandler` and the `RPCEvent`. +#[derive(Debug)] +pub enum OneShotEvent { + /// We received an RPC from a remote. + Rx(RPCEvent), + /// We successfully sent an RPC request. + Sent, +} + +impl From for OneShotEvent { + #[inline] + fn from(rpc: RPCEvent) -> OneShotEvent { + OneShotEvent::Rx(rpc) + } +} + +impl From<()> for OneShotEvent { + #[inline] + fn from(_: ()) -> OneShotEvent { + OneShotEvent::Sent + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs new file mode 100644 index 0000000000..c19aca8ffd --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -0,0 +1,181 @@ +use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use ssz::{ssz_encode, Decodable, Encodable, SszStream}; +use std::io; +use std::iter; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// The maximum bytes that can be sent across the RPC. +const MAX_READ_SIZE: usize = 2048; + +/// Implementation of the `ConnectionUpgrade` for the rpc protocol. + +#[derive(Debug, Clone)] +pub struct RPCProtocol; + +impl UpgradeInfo for RPCProtocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/eth/serenity/rpc/1.0.0") + } +} + +impl Default for RPCProtocol { + fn default() -> Self { + RPCProtocol + } +} + +/// The RPC types which are sent/received in this protocol. +#[derive(Debug, Clone)] +pub enum RPCEvent { + Request { + id: u64, + method_id: u16, + body: RPCRequest, + }, + Response { + id: u64, + method_id: u16, //TODO: Remove and process decoding upstream + result: RPCResponse, + }, +} + +impl UpgradeInfo for RPCEvent { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/eth/serenity/rpc/1.0.0") + } +} + +impl InboundUpgrade for RPCProtocol +where + TSocket: AsyncRead + AsyncWrite, +{ + type Output = RPCEvent; + type Error = DecodeError; + type Future = + upgrade::ReadOneThen, ()) -> Result>; + + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?)) + } +} + +fn decode(packet: Vec) -> Result { + // decode the header of the rpc + // request/response + let (request, index) = bool::ssz_decode(&packet, 0)?; + let (id, index) = u64::ssz_decode(&packet, index)?; + let (method_id, index) = u16::ssz_decode(&packet, index)?; + + if request { + let body = match RPCMethod::from(method_id) { + RPCMethod::Hello => { + let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?; + RPCRequest::Hello(hello_body) + } + RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + }; + + Ok(RPCEvent::Request { + id, + method_id, + body, + }) + } + // we have received a response + else { + let result = match RPCMethod::from(method_id) { + RPCMethod::Hello => { + let (body, _index) = HelloMessage::ssz_decode(&packet, index)?; + RPCResponse::Hello(body) + } + RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + }; + Ok(RPCEvent::Response { + id, + method_id, + result, + }) + } +} + +impl OutboundUpgrade for RPCEvent +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = upgrade::WriteOne; + + #[inline] + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let bytes = ssz_encode(&self); + upgrade::write_one(socket, bytes) + } +} + +impl Encodable for RPCEvent { + fn ssz_append(&self, s: &mut SszStream) { + match self { + RPCEvent::Request { + id, + method_id, + body, + } => { + s.append(&true); + s.append(id); + s.append(method_id); + match body { + RPCRequest::Hello(body) => { + s.append(body); + } + _ => {} + } + } + RPCEvent::Response { + id, + method_id, + result, + } => { + s.append(&false); + s.append(id); + s.append(method_id); + match result { + RPCResponse::Hello(response) => { + s.append(response); + } + _ => {} + } + } + } + } +} + +#[derive(Debug)] +pub enum DecodeError { + ReadError(upgrade::ReadOneError), + SSZDecodeError(ssz::DecodeError), + UnknownRPCMethod, +} + +impl From for DecodeError { + #[inline] + fn from(err: upgrade::ReadOneError) -> Self { + DecodeError::ReadError(err) + } +} + +impl From for DecodeError { + #[inline] + fn from(err: ssz::DecodeError) -> Self { + DecodeError::SSZDecodeError(err) + } +} diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs new file mode 100644 index 0000000000..e378cd634a --- /dev/null +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -0,0 +1,163 @@ +use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::error; +use crate::multiaddr::Protocol; +use crate::rpc::RPCEvent; +use crate::NetworkConfig; +use futures::prelude::*; +use futures::Stream; +use libp2p::core::{ + muxing::StreamMuxerBox, + nodes::Substream, + transport::boxed::Boxed, + upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, +}; +use libp2p::{core, secio, Transport}; +use libp2p::{PeerId, Swarm}; +use slog::{debug, info, trace, warn}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; +use types::TopicBuilder; + +/// The configuration and state of the libp2p components for the beacon node. +pub struct Service { + /// The libp2p Swarm handler. + //TODO: Make this private + pub swarm: Swarm, Behaviour>>, + /// This node's PeerId. + local_peer_id: PeerId, + /// The libp2p logger handle. + pub log: slog::Logger, +} + +impl Service { + pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { + debug!(log, "Libp2p Service starting"); + + let local_private_key = config.local_private_key; + let local_peer_id = local_private_key.to_peer_id(); + info!(log, "Local peer id: {:?}", local_peer_id); + + let mut swarm = { + // Set up the transport + let transport = build_transport(local_private_key); + // Set up gossipsub routing + let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log); + // Set up Topology + let topology = local_peer_id.clone(); + Swarm::new(transport, behaviour, topology) + }; + + // listen on all addresses + for address in &config.listen_addresses { + match Swarm::listen_on(&mut swarm, address.clone()) { + Ok(mut listen_addr) => { + listen_addr.append(Protocol::P2p(local_peer_id.clone().into())); + info!(log, "Listening on: {}", listen_addr); + } + Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err), + }; + } + // connect to boot nodes - these are currently stored as multiaddrs + // Once we have discovery, can set to peerId + for bootnode in config.boot_nodes { + match Swarm::dial_addr(&mut swarm, bootnode.clone()) { + Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode), + Err(err) => debug!( + log, + "Could not connect to bootnode: {} error: {:?}", bootnode, err + ), + }; + } + + // subscribe to default gossipsub topics + let mut subscribed_topics = vec![]; + for topic in config.topics { + let t = TopicBuilder::new(topic.to_string()).build(); + if swarm.subscribe(t) { + trace!(log, "Subscribed to topic: {:?}", topic); + subscribed_topics.push(topic); + } else { + warn!(log, "Could not subscribe to topic: {:?}", topic) + } + } + info!(log, "Subscribed to topics: {:?}", subscribed_topics); + + Ok(Service { + local_peer_id, + swarm, + log, + }) + } +} + +impl Stream for Service { + type Item = Libp2pEvent; + type Error = crate::error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + // TODO: Currently only gossipsub events passed here. + // Build a type for more generic events + match self.swarm.poll() { + Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => { + // TODO: Stub here for debugging + debug!(self.log, "Message received: {}", m); + return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); + } + Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => { + return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); + } + Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); + } + Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), + Ok(Async::NotReady) => break, + _ => break, + } + } + Ok(Async::NotReady) + } +} + +/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and +/// mplex or yamux as the multiplexing layer. +fn build_transport( + local_private_key: secio::SecioKeyPair, +) -> Boxed<(PeerId, StreamMuxerBox), Error> { + // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised + // in the future. + let transport = libp2p::tcp::TcpConfig::new(); + let transport = libp2p::dns::DnsConfig::new(transport); + #[cfg(feature = "libp2p-websocket")] + let transport = { + let trans_clone = transport.clone(); + transport.or_transport(websocket::WsConfig::new(trans_clone)) + }; + transport + .with_upgrade(secio::SecioConfig::new(local_private_key)) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + let peer_id2 = peer_id.clone(); + let upgrade = core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + ) + // TODO: use a single `.map` instead of two maps + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + + core::upgrade::apply(out.stream, upgrade, endpoint) + .map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) + }) + .with_timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() +} + +/// Events that can be obtained from polling the Libp2p Service. +pub enum Libp2pEvent { + // We have received an RPC event on the swarm + RPC(PeerId, RPCEvent), + PeerDialed(PeerId), + Message(String), +} diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml new file mode 100644 index 0000000000..5275ed82fe --- /dev/null +++ b/beacon_node/network/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "network" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +beacon_chain = { path = "../beacon_chain" } +eth2-libp2p = { path = "../eth2-libp2p" } +version = { path = "../version" } +types = { path = "../../eth2/types" } +slog = "2.4.1" +futures = "0.1.25" +error-chain = "0.12.0" +crossbeam-channel = "0.3.8" +tokio = "0.1.16" diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs new file mode 100644 index 0000000000..91628cc7e9 --- /dev/null +++ b/beacon_node/network/src/beacon_chain.rs @@ -0,0 +1,43 @@ +use beacon_chain::BeaconChain as RawBeaconChain; +use beacon_chain::{ + db::ClientDB, + fork_choice::ForkChoice, + parking_lot::RwLockReadGuard, + slot_clock::SlotClock, + types::{BeaconState, ChainSpec}, + CheckPoint, +}; + +/// The network's API to the beacon chain. +pub trait BeaconChain: Send + Sync { + fn get_spec(&self) -> &ChainSpec; + + fn get_state(&self) -> RwLockReadGuard; + + fn head(&self) -> RwLockReadGuard; + + fn finalized_head(&self) -> RwLockReadGuard; +} + +impl BeaconChain for RawBeaconChain +where + T: ClientDB + Sized, + U: SlotClock, + F: ForkChoice, +{ + fn get_spec(&self) -> &ChainSpec { + &self.spec + } + + fn get_state(&self) -> RwLockReadGuard { + self.state.read() + } + + fn head(&self) -> RwLockReadGuard { + self.head() + } + + fn finalized_head(&self) -> RwLockReadGuard { + self.finalized_head() + } +} diff --git a/beacon_node/network/src/error.rs b/beacon_node/network/src/error.rs new file mode 100644 index 0000000000..cdd6b62094 --- /dev/null +++ b/beacon_node/network/src/error.rs @@ -0,0 +1,13 @@ +// generates error types +use eth2_libp2p; + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! { + links { + Libp2p(eth2_libp2p::error::Error, eth2_libp2p::error::ErrorKind); + } +} diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs new file mode 100644 index 0000000000..61a29ed356 --- /dev/null +++ b/beacon_node/network/src/lib.rs @@ -0,0 +1,9 @@ +/// This crate provides the network server for Lighthouse. +pub mod beacon_chain; +pub mod error; +mod message_handler; +mod service; +pub mod sync; + +pub use eth2_libp2p::NetworkConfig; +pub use service::Service; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs new file mode 100644 index 0000000000..dbf8c7d9da --- /dev/null +++ b/beacon_node/network/src/message_handler.rs @@ -0,0 +1,225 @@ +use crate::beacon_chain::BeaconChain; +use crate::error; +use crate::service::{NetworkMessage, OutgoingMessage}; +use crate::sync::SimpleSync; +use crossbeam_channel::{unbounded as channel, Sender}; +use eth2_libp2p::{ + rpc::{RPCMethod, RPCRequest, RPCResponse}, + HelloMessage, PeerId, RPCEvent, +}; +use futures::future; +use slog::warn; +use slog::{debug, trace}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// Timeout for RPC requests. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); +/// Timeout before banning a peer for non-identification. +const HELLO_TIMEOUT: Duration = Duration::from_secs(30); + +/// Handles messages received from the network and client and organises syncing. +pub struct MessageHandler { + /// Currently loaded and initialised beacon chain. + chain: Arc, + /// The syncing framework. + sync: SimpleSync, + /// The network channel to relay messages to the Network service. + network_send: crossbeam_channel::Sender, + /// A mapping of peers and the RPC id we have sent an RPC request to. + requests: HashMap<(PeerId, u64), Instant>, + /// A counter of request id for each peer. + request_ids: HashMap, + /// The `MessageHandler` logger. + log: slog::Logger, +} + +/// Types of messages the handler can receive. +#[derive(Debug, Clone)] +pub enum HandlerMessage { + /// We have initiated a connection to a new peer. + PeerDialed(PeerId), + /// Peer has disconnected, + PeerDisconnected(PeerId), + /// An RPC response/request has been received. + RPC(PeerId, RPCEvent), + /// A block has been imported. + BlockImported(), //TODO: This comes from pub-sub - decide its contents +} + +impl MessageHandler { + /// Initializes and runs the MessageHandler. + pub fn spawn( + beacon_chain: Arc, + network_send: crossbeam_channel::Sender, + executor: &tokio::runtime::TaskExecutor, + log: slog::Logger, + ) -> error::Result> { + debug!(log, "Service starting"); + + let (handler_send, handler_recv) = channel(); + + // Initialise sync and begin processing in thread + // generate the Message handler + let sync = SimpleSync::new(beacon_chain.clone(), &log); + + let mut handler = MessageHandler { + // TODO: The handler may not need a chain, perhaps only sync? + chain: beacon_chain.clone(), + sync, + network_send, + requests: HashMap::new(), + request_ids: HashMap::new(), + + log: log.clone(), + }; + + // spawn handler task + // TODO: Handle manual termination of thread + executor.spawn(future::poll_fn(move || -> Result<_, _> { + loop { + handler.handle_message(handler_recv.recv().map_err(|_| { + debug!(log, "Network message handler terminated."); + })?); + } + })); + + Ok(handler_send) + } + + /// Handle all messages incoming from the network service. + fn handle_message(&mut self, message: HandlerMessage) { + match message { + // we have initiated a connection to a peer + HandlerMessage::PeerDialed(peer_id) => { + let id = self.generate_request_id(&peer_id); + self.send_hello(peer_id, id, true); + } + // we have received an RPC message request/response + HandlerMessage::RPC(peer_id, rpc_event) => { + self.handle_rpc_message(peer_id, rpc_event); + } + //TODO: Handle all messages + _ => {} + } + } + + /* RPC - Related functionality */ + + /// Handle RPC messages + fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { + match rpc_message { + RPCEvent::Request { id, body, .. // TODO: Clean up RPC Message types, have a cleaner type by this point. + } => self.handle_rpc_request(peer_id, id, body), + RPCEvent::Response { id, result, .. } => self.handle_rpc_response(peer_id, id, result), + } + } + + /// A new RPC request has been received from the network. + fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) { + match request { + RPCRequest::Hello(hello_message) => { + self.handle_hello_request(peer_id, id, hello_message) + } + // TODO: Handle all requests + _ => {} + } + } + + /// An RPC response has been received from the network. + // we match on id and ignore responses past the timeout. + fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) { + // if response id is related to a request, ignore (likely RPC timeout) + if self.requests.remove(&(peer_id.clone(), id)).is_none() { + debug!(self.log, "Unrecognized response from peer: {:?}", peer_id); + return; + } + match response { + RPCResponse::Hello(hello_message) => { + debug!(self.log, "Hello response received from peer: {:?}", peer_id); + self.validate_hello(peer_id, hello_message); + } + // TODO: Handle all responses + _ => {} + } + } + + /// Handle a HELLO RPC request message. + fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) { + // send back a HELLO message + self.send_hello(peer_id.clone(), id, false); + // validate the peer + self.validate_hello(peer_id, hello_message); + } + + /// Validate a HELLO RPC message. + fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) { + // validate the peer + if !self.sync.validate_peer(peer_id.clone(), message) { + debug!( + self.log, + "Peer dropped due to mismatching HELLO messages: {:?}", peer_id + ); + //TODO: block/ban the peer + } + } + + /* General RPC helper functions */ + + /// Generates a new request id for a peer. + fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 { + // generate a unique id for the peer + let id = { + let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); + let id = borrowed_id.clone(); + //increment the counter + *borrowed_id += 1; + id + }; + // register RPC request + self.requests.insert((peer_id.clone(), id), Instant::now()); + debug!( + self.log, + "Hello request registered with peer: {:?}", peer_id + ); + id + } + + /// Sends a HELLO RPC request or response to a newly connected peer. + //TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write + fn send_hello(&mut self, peer_id: PeerId, id: u64, is_request: bool) { + let rpc_event = if is_request { + RPCEvent::Request { + id, + method_id: RPCMethod::Hello.into(), + body: RPCRequest::Hello(self.sync.generate_hello()), + } + } else { + RPCEvent::Response { + id, + method_id: RPCMethod::Hello.into(), + result: RPCResponse::Hello(self.sync.generate_hello()), + } + }; + + // send the hello request to the network + trace!(self.log, "Sending HELLO message to peer {:?}", peer_id); + self.send_rpc(peer_id, rpc_event); + } + + /// Sends an RPC request/response to the network server. + fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) { + self.network_send + .send(NetworkMessage::Send( + peer_id, + OutgoingMessage::RPC(rpc_event), + )) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send RPC message to the network service" + ) + }); + } +} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs new file mode 100644 index 0000000000..14f994e4a5 --- /dev/null +++ b/beacon_node/network/src/service.rs @@ -0,0 +1,180 @@ +use crate::beacon_chain::BeaconChain; +use crate::error; +use crate::message_handler::{HandlerMessage, MessageHandler}; +use crate::NetworkConfig; +use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; +use eth2_libp2p::RPCEvent; +use eth2_libp2p::Service as LibP2PService; +use eth2_libp2p::{Libp2pEvent, PeerId}; +use futures::prelude::*; +use futures::sync::oneshot; +use futures::Stream; +use slog::{debug, info, o, trace}; +use std::sync::Arc; +use tokio::runtime::TaskExecutor; + +/// Service that handles communication between internal services and the eth2_libp2p network service. +pub struct Service { + //eth2_libp2p_service: Arc>, + eth2_libp2p_exit: oneshot::Sender<()>, + network_send: crossbeam_channel::Sender, + //message_handler: MessageHandler, + //message_handler_send: Sender, +} + +impl Service { + pub fn new( + beacon_chain: Arc, + config: &NetworkConfig, + executor: &TaskExecutor, + log: slog::Logger, + ) -> error::Result<(Arc, Sender)> { + // build the network channel + let (network_send, network_recv) = channel::(); + // launch message handler thread + let message_handler_log = log.new(o!("Service" => "MessageHandler")); + let message_handler_send = MessageHandler::spawn( + beacon_chain, + network_send.clone(), + executor, + message_handler_log, + )?; + + // launch eth2_libp2p service + let eth2_libp2p_log = log.new(o!("Service" => "Libp2p")); + let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?; + + // TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread. + let eth2_libp2p_exit = spawn_service( + eth2_libp2p_service, + network_recv, + message_handler_send, + executor, + log, + )?; + let network_service = Service { + eth2_libp2p_exit, + network_send: network_send.clone(), + }; + + Ok((Arc::new(network_service), network_send)) + } + + // TODO: Testing only + pub fn send_message(&self) { + self.network_send + .send(NetworkMessage::Send( + PeerId::random(), + OutgoingMessage::NotifierTest, + )) + .unwrap(); + } +} + +fn spawn_service( + eth2_libp2p_service: LibP2PService, + network_recv: crossbeam_channel::Receiver, + message_handler_send: crossbeam_channel::Sender, + executor: &TaskExecutor, + log: slog::Logger, +) -> error::Result> { + let (network_exit, exit_rx) = oneshot::channel(); + + // spawn on the current executor + executor.spawn( + network_service( + eth2_libp2p_service, + network_recv, + message_handler_send, + log.clone(), + ) + // allow for manual termination + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| { + info!(log.clone(), "Network service shutdown"); + Ok(()) + }), + ); + + Ok(network_exit) +} + +fn network_service( + mut eth2_libp2p_service: LibP2PService, + network_recv: crossbeam_channel::Receiver, + message_handler_send: crossbeam_channel::Sender, + log: slog::Logger, +) -> impl futures::Future { + futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { + // poll the swarm + loop { + match eth2_libp2p_service.poll() { + Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { + trace!( + eth2_libp2p_service.log, + "RPC Event: RPC message received: {:?}", + rpc_event + ); + message_handler_send + .send(HandlerMessage::RPC(peer_id, rpc_event)) + .map_err(|_| "failed to send rpc to handler")?; + } + Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { + debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDialed(peer_id)) + .map_err(|_| "failed to send rpc to handler")?; + } + Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( + eth2_libp2p_service.log, + "Network Service: Message received: {}", m + ), + _ => break, + } + } + // poll the network channel + // TODO: refactor - combine poll_fn's? + loop { + match network_recv.try_recv() { + // TODO: Testing message - remove + Ok(NetworkMessage::Send(peer_id, outgoing_message)) => { + match outgoing_message { + OutgoingMessage::RPC(rpc_event) => { + trace!(log, "Sending RPC Event: {:?}", rpc_event); + //TODO: Make swarm private + //TODO: Implement correct peer id topic message handling + eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event); + } + OutgoingMessage::NotifierTest => { + debug!(log, "Received message from notifier"); + } + }; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(eth2_libp2p::error::Error::from( + "Network channel disconnected", + )); + } + } + } + Ok(Async::NotReady) + }) +} + +/// Types of messages that the network service can receive. +#[derive(Debug, Clone)] +pub enum NetworkMessage { + /// Send a message to eth2_libp2p service. + //TODO: Define typing for messages across the wire + Send(PeerId, OutgoingMessage), +} + +/// Type of outgoing messages that can be sent through the network service. +#[derive(Debug, Clone)] +pub enum OutgoingMessage { + /// Send an RPC request/response. + RPC(RPCEvent), + //TODO: Remove + NotifierTest, +} diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs new file mode 100644 index 0000000000..8f5216b857 --- /dev/null +++ b/beacon_node/network/src/sync/mod.rs @@ -0,0 +1,11 @@ +/// Syncing for lighthouse. +/// +/// Stores the various syncing methods for the beacon chain. +mod simple_sync; + +pub use simple_sync::SimpleSync; + +/// Currently implemented sync methods. +pub enum SyncMethod { + SimpleSync, +} diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs new file mode 100644 index 0000000000..0f7de6ab97 --- /dev/null +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -0,0 +1,112 @@ +use crate::beacon_chain::BeaconChain; +use eth2_libp2p::rpc::HelloMessage; +use eth2_libp2p::PeerId; +use slog::{debug, o}; +use std::collections::HashMap; +use std::sync::Arc; +use types::{Epoch, Hash256, Slot}; + +/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. +const SLOT_IMPORT_TOLERANCE: u64 = 100; + +/// Keeps track of syncing information for known connected peers. +pub struct PeerSyncInfo { + latest_finalized_root: Hash256, + latest_finalized_epoch: Epoch, + best_root: Hash256, + best_slot: Slot, +} + +/// The current syncing state. +#[derive(PartialEq)] +pub enum SyncState { + Idle, + Downloading, + Stopped, +} + +/// Simple Syncing protocol. +//TODO: Decide for HELLO messages whether its better to keep current in RAM or build on the fly +//when asked. +pub struct SimpleSync { + /// A reference to the underlying beacon chain. + chain: Arc, + /// A mapping of Peers to their respective PeerSyncInfo. + known_peers: HashMap, + /// The current state of the syncing protocol. + state: SyncState, + /// The network id, for quick HELLO RPC message lookup. + network_id: u8, + /// The latest epoch of the syncing chain. + latest_finalized_epoch: Epoch, + /// The latest block of the syncing chain. + latest_slot: Slot, + /// Sync logger. + log: slog::Logger, +} + +impl SimpleSync { + pub fn new(beacon_chain: Arc, log: &slog::Logger) -> Self { + let state = beacon_chain.get_state(); + let sync_logger = log.new(o!("Service"=> "Sync")); + SimpleSync { + chain: beacon_chain.clone(), + known_peers: HashMap::new(), + state: SyncState::Idle, + network_id: beacon_chain.get_spec().network_id, + latest_finalized_epoch: state.finalized_epoch, + latest_slot: state.slot - 1, //TODO: Build latest block function into Beacon chain and correct this + log: sync_logger, + } + } + + /// Generates our current state in the form of a HELLO RPC message. + pub fn generate_hello(&self) -> HelloMessage { + let state = &self.chain.get_state(); + //TODO: Paul to verify the logic of these fields. + HelloMessage { + network_id: self.network_id, + latest_finalized_root: state.finalized_root, + latest_finalized_epoch: state.finalized_epoch, + best_root: Hash256::zero(), //TODO: build correct value as a beacon chain function + best_slot: state.slot - 1, + } + } + + pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool { + // network id must match + if hello_message.network_id != self.network_id { + return false; + } + // compare latest epoch and finalized root to see if they exist in our chain + if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch { + // ensure their finalized root is in our chain + // TODO: Get the finalized root at hello_message.latest_epoch and ensure they match + //if (hello_message.latest_finalized_root == self.chain.get_state() { + // return false; + // } + } + + // the client is valid, add it to our list of known_peers and request sync if required + // update peer list if peer already exists + let peer_info = PeerSyncInfo { + latest_finalized_root: hello_message.latest_finalized_root, + latest_finalized_epoch: hello_message.latest_finalized_epoch, + best_root: hello_message.best_root, + best_slot: hello_message.best_slot, + }; + + debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); + self.known_peers.insert(peer_id, peer_info); + + // set state to sync + if self.state == SyncState::Idle + && hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE + { + self.state = SyncState::Downloading; + //TODO: Start requesting blocks from known peers. Ideally in batches + } + + true + } +} diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml new file mode 100644 index 0000000000..4c3333ee1d --- /dev/null +++ b/beacon_node/rpc/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "rpc" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +bls = { path = "../../eth2/utils/bls" } +beacon_chain = { path = "../beacon_chain" } + +protos = { path = "../../protos" } +grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +protobuf = "2.0.2" +clap = "2.32.0" +db = { path = "../db" } +dirs = "1.0.3" +futures = "0.1.23" +slog = "^2.2.3" +slot_clock = { path = "../../eth2/utils/slot_clock" } +slog-term = "^2.4.0" +slog-async = "^2.3.0" +types = { path = "../../eth2/types" } +ssz = { path = "../../eth2/utils/ssz" } diff --git a/beacon_node/src/rpc/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs similarity index 100% rename from beacon_node/src/rpc/beacon_block.rs rename to beacon_node/rpc/src/beacon_block.rs diff --git a/beacon_node/rpc/src/config.rs b/beacon_node/rpc/src/config.rs new file mode 100644 index 0000000000..e21c2f7a89 --- /dev/null +++ b/beacon_node/rpc/src/config.rs @@ -0,0 +1,22 @@ +use std::net::Ipv4Addr; + +/// RPC Configuration +#[derive(Debug, Clone)] +pub struct Config { + /// Enable the RPC server. + pub enabled: bool, + /// The IPv4 address the RPC will listen on. + pub listen_address: Ipv4Addr, + /// The port the RPC will listen on. + pub port: u16, +} + +impl Default for Config { + fn default() -> Self { + Config { + enabled: false, // rpc disabled by default + listen_address: Ipv4Addr::new(127, 0, 0, 1), + port: 5051, + } + } +} diff --git a/beacon_node/src/rpc/mod.rs b/beacon_node/rpc/src/lib.rs similarity index 74% rename from beacon_node/src/rpc/mod.rs rename to beacon_node/rpc/src/lib.rs index 6a18a4aa88..7f776d7d85 100644 --- a/beacon_node/src/rpc/mod.rs +++ b/beacon_node/rpc/src/lib.rs @@ -1,16 +1,18 @@ mod beacon_block; +pub mod config; mod validator; use self::beacon_block::BeaconBlockServiceInstance; use self::validator::ValidatorServiceInstance; +pub use config::Config as RPCConfig; use grpcio::{Environment, Server, ServerBuilder}; use protos::services_grpc::{create_beacon_block_service, create_validator_service}; use std::sync::Arc; -use slog::{info, Logger}; +use slog::{info, o}; -pub fn start_server(log: Logger) -> Server { - let log_clone = log.clone(); +pub fn start_server(config: &RPCConfig, log: &slog::Logger) -> Server { + let log = log.new(o!("Service"=>"RPC")); let env = Arc::new(Environment::new(1)); let beacon_block_service = { @@ -25,12 +27,12 @@ pub fn start_server(log: Logger) -> Server { let mut server = ServerBuilder::new(env) .register_service(beacon_block_service) .register_service(validator_service) - .bind("127.0.0.1", 50_051) + .bind(config.listen_address.to_string(), config.port) .build() .unwrap(); server.start(); for &(ref host, port) in server.bind_addrs() { - info!(log_clone, "gRPC listening on {}:{}", host, port); + info!(log, "gRPC listening on {}:{}", host, port); } server } diff --git a/beacon_node/src/rpc/validator.rs b/beacon_node/rpc/src/validator.rs similarity index 100% rename from beacon_node/src/rpc/validator.rs rename to beacon_node/rpc/src/validator.rs diff --git a/beacon_node/src/config/mod.rs b/beacon_node/src/config/mod.rs deleted file mode 100644 index 5c94e300c7..0000000000 --- a/beacon_node/src/config/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::fs; -use std::path::PathBuf; - -/// Stores the core configuration for this Lighthouse instance. -/// This struct is general, other components may implement more -/// specialized config structs. -#[derive(Clone)] -pub struct LighthouseConfig { - pub data_dir: PathBuf, - pub p2p_listen_port: u16, -} - -const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse"; - -impl LighthouseConfig { - /// Build a new lighthouse configuration from defaults. - pub fn default() -> Self { - let data_dir = { - let home = dirs::home_dir().expect("Unable to determine home dir."); - home.join(DEFAULT_LIGHTHOUSE_DIR) - }; - fs::create_dir_all(&data_dir) - .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); - let p2p_listen_port = 0; - Self { - data_dir, - p2p_listen_port, - } - } -} diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 2436d4f7c5..ea74c73766 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -1,35 +1,20 @@ extern crate slog; -mod config; -mod rpc; +mod run; -use std::path::PathBuf; - -use crate::config::LighthouseConfig; -use crate::rpc::start_server; -use beacon_chain::BeaconChain; use clap::{App, Arg}; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - MemoryDB, -}; -use fork_choice::BitwiseLMDGhost; -use slog::{error, info, o, Drain}; -use slot_clock::SystemTimeSlotClock; -use ssz::TreeHash; -use std::sync::Arc; -use types::test_utils::TestingBeaconStateBuilder; -use types::*; +use client::ClientConfig; +use slog::{error, o, Drain}; fn main() { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::CompactFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); - let log = slog::Logger::root(drain, o!()); + let logger = slog::Logger::root(drain, o!()); let matches = App::new("Lighthouse") - .version("0.0.1") - .author("Sigma Prime ") + .version(version::version().as_str()) + .author("Sigma Prime ") .about("Eth 2.0 Client") .arg( Arg::with_name("datadir") @@ -38,6 +23,13 @@ fn main() { .help("Data directory for keys and databases.") .takes_value(true), ) + .arg( + Arg::with_name("listen_address") + .long("listen-address") + .value_name("Listen Address") + .help("The Network address to listen for p2p connections.") + .takes_value(true), + ) .arg( Arg::with_name("port") .long("port") @@ -45,64 +37,34 @@ fn main() { .help("Network listen port for p2p connections.") .takes_value(true), ) + .arg( + Arg::with_name("rpc") + .long("rpc") + .value_name("RPC") + .help("Enable the RPC server.") + .takes_value(false), + ) + .arg( + Arg::with_name("rpc-address") + .long("rpc-address") + .value_name("RPCADDRESS") + .help("Listen address for RPC endpoint.") + .takes_value(true), + ) + .arg( + Arg::with_name("rpc-port") + .long("rpc-port") + .value_name("RPCPORT") + .help("Listen port for RPC endpoint.") + .takes_value(true), + ) .get_matches(); - let mut config = LighthouseConfig::default(); + // invalid arguments, panic + let config = ClientConfig::parse_args(matches, &logger).unwrap(); - // Custom datadir - if let Some(dir) = matches.value_of("datadir") { - config.data_dir = PathBuf::from(dir.to_string()); - } - - // Custom p2p listen port - if let Some(port_str) = matches.value_of("port") { - if let Ok(port) = port_str.parse::() { - config.p2p_listen_port = port; - } else { - error!(log, "Invalid port"; "port" => port_str); - return; - } - } - - // Log configuration - info!(log, ""; - "data_dir" => &config.data_dir.to_str(), - "port" => &config.p2p_listen_port); - - // Specification (presently fixed to foundation). - let spec = ChainSpec::foundation(); - - // Database (presently in-memory) - let db = Arc::new(MemoryDB::open()); - let block_store = Arc::new(BeaconBlockStore::new(db.clone())); - let state_store = Arc::new(BeaconStateStore::new(db.clone())); - - let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, &spec); - let (genesis_state, _keypairs) = state_builder.build(); - - let mut genesis_block = BeaconBlock::empty(&spec); - genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); - - // Slot clock - let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot) - .expect("Unable to load SystemTimeSlotClock"); - // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); - - // Genesis chain - let _chain_result = BeaconChain::from_genesis( - state_store.clone(), - block_store.clone(), - slot_clock, - genesis_state, - genesis_block, - spec, - fork_choice, - ); - - let _server = start_server(log.clone()); - - loop { - std::thread::sleep(std::time::Duration::from_secs(1)); + match run::run_beacon_node(config, &logger) { + Ok(_) => {} + Err(e) => error!(logger, "Beacon node failed because {:?}", e), } } diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs new file mode 100644 index 0000000000..b3b2844526 --- /dev/null +++ b/beacon_node/src/run.rs @@ -0,0 +1,51 @@ +use client::client_types::TestingClientType; +use client::error; +use client::{notifier, Client, ClientConfig}; +use futures::sync::oneshot; +use futures::Future; +use slog::info; +use std::cell::RefCell; +use tokio::runtime::Builder; + +pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> { + let mut runtime = Builder::new() + .name_prefix("main-") + .build() + .map_err(|e| format!("{:?}", e))?; + + // Log configuration + info!(log, "Listening on {:?}", &config.net_conf.listen_addresses; + "data_dir" => &config.data_dir.to_str(), + "port" => &config.net_conf.listen_port); + + // run service until ctrl-c + let (ctrlc_send, ctrlc) = oneshot::channel(); + let ctrlc_send_c = RefCell::new(Some(ctrlc_send)); + ctrlc::set_handler(move || { + if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() { + ctrlc_send.send(()).expect("Error sending ctrl-c message"); + } + }) + .map_err(|e| format!("Could not set ctrlc hander: {:?}", e))?; + + let (exit_signal, exit) = exit_future::signal(); + + let executor = runtime.executor(); + + // currently testing - using TestingClientType + let client: Client = Client::new(config, log.clone(), &executor)?; + notifier::run(&client, executor, exit); + + runtime + .block_on(ctrlc) + .map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))?; + + // perform global shutdown operations. + info!(log, "Shutting down.."); + exit_signal.fire(); + // shutdown the client + // client.exit_signal.fire(); + drop(client); + runtime.shutdown_on_idle().wait().unwrap(); + Ok(()) +} diff --git a/beacon_node/version/Cargo.toml b/beacon_node/version/Cargo.toml new file mode 100644 index 0000000000..0497408f1b --- /dev/null +++ b/beacon_node/version/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "version" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +target_info = "0.1.0" diff --git a/beacon_node/version/src/lib.rs b/beacon_node/version/src/lib.rs new file mode 100644 index 0000000000..3dcd57beff --- /dev/null +++ b/beacon_node/version/src/lib.rs @@ -0,0 +1,25 @@ +//TODO: Build the version and hash of the built lighthouse binary + +/// Version information for the Lighthouse beacon node. +// currently only supports unstable release +extern crate target_info; + +use target_info::Target; + +const TRACK: &str = "unstable"; + +/// Provides the current platform +pub fn platform() -> String { + format!("{}-{}", Target::arch(), Target::os()) +} + +/// Version of the beacon node. +// TODO: Find the sha3 hash, date and rust version used to build the beacon_node binary +pub fn version() -> String { + format!( + "Lighthouse/v{}-{}/{}", + env!("CARGO_PKG_VERSION"), + TRACK, + platform() + ) +} diff --git a/eth2/fork_choice/src/lib.rs b/eth2/fork_choice/src/lib.rs index 0d6969e89f..016cd5deab 100644 --- a/eth2/fork_choice/src/lib.rs +++ b/eth2/fork_choice/src/lib.rs @@ -96,6 +96,7 @@ impl From for ForkChoiceError { } /// Fork choice options that are currently implemented. +#[derive(Debug, Clone)] pub enum ForkChoiceAlgorithm { /// Chooses the longest chain becomes the head. Not for production. LongestChain, diff --git a/eth2/types/Cargo.toml b/eth2/types/Cargo.toml index 27aef19d64..1b5f79d23f 100644 --- a/eth2/types/Cargo.toml +++ b/eth2/types/Cargo.toml @@ -24,6 +24,7 @@ ssz = { path = "../utils/ssz" } ssz_derive = { path = "../utils/ssz_derive" } swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" } test_random_derive = { path = "../utils/test_random_derive" } +libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" } [dev-dependencies] env_logger = "0.6.0" diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index cfb88bcb83..65ea5c4d4e 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -112,6 +112,13 @@ pub struct ChainSpec { domain_deposit: u32, domain_exit: u32, domain_transfer: u32, + + /* + * Network specific parameters + * + */ + pub boot_nodes: Vec, + pub network_id: u8, } impl ChainSpec { @@ -243,6 +250,30 @@ impl ChainSpec { domain_deposit: 3, domain_exit: 4, domain_transfer: 5, + + /* + * Boot nodes + */ + boot_nodes: vec![], + network_id: 1, // foundation network id + } + } + + /// Returns a `ChainSpec` compatible with the Lighthouse testnet specification. + /// + /// Spec v0.4.0 + pub fn lighthouse_testnet() -> Self { + /* + * Lighthouse testnet bootnodes + */ + let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9000" + .parse() + .expect("correct multiaddr")]; + + Self { + boot_nodes, + network_id: 2, // lighthouse testnet network id + ..ChainSpec::few_validators() } } diff --git a/eth2/types/src/lib.rs b/eth2/types/src/lib.rs index 30b0e4a132..953a9508f9 100644 --- a/eth2/types/src/lib.rs +++ b/eth2/types/src/lib.rs @@ -85,3 +85,6 @@ pub type AttesterMap = HashMap<(u64, u64), Vec>; pub type ProposerMap = HashMap; pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature}; +pub use libp2p::floodsub::{Topic, TopicBuilder}; +pub use libp2p::multiaddr; +pub use libp2p::Multiaddr;