diff --git a/Cargo.toml b/Cargo.toml index 0207b3d1b2..a15c84fe07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ rlp = { git = "https://github.com/paritytech/parity-common" } slog = "^2.2.3" slog-term = "^2.4.0" slog-async = "^2.3.0" +tokio = "0.1" +# Old tokio required for libp2p tokio-io = "0.1" tokio-core = "0.1" tokio-timer = "0.1" diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000000..af7af84a93 --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1,26 @@ +use std::env; +use std::path::PathBuf; + +#[derive(Clone)] +pub struct LighthouseConfig { + pub data_dir: PathBuf, + pub p2p_listen_port: String, +} + +const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse"; + +impl LighthouseConfig { + /// Build a new lighthouse configuration from defaults. + pub fn default() -> Self{ + let data_dir = { + let home = env::home_dir() + .expect("Unable to determine home dir."); + home.join(DEFAULT_LIGHTHOUSE_DIR) + }; + let p2p_listen_port = "0".to_string(); + Self { + data_dir, + p2p_listen_port, + } + } +} diff --git a/src/main.rs b/src/main.rs index b5b98c6b56..7644d10992 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,15 +8,18 @@ extern crate libp2p_peerstore; pub mod p2p; pub mod pubkeystore; pub mod state; +pub mod sync; pub mod utils; +pub mod config; use std::path::PathBuf; use slog::Drain; use clap::{ Arg, App }; -use p2p::config::NetworkConfig; +use config::LighthouseConfig; use p2p::service::NetworkService; use p2p::state::NetworkState; +use sync::sync_start; fn main() { let decorator = slog_term::TermDecorator::new().build(); @@ -40,17 +43,16 @@ fn main() { .takes_value(true)) .get_matches(); - let mut config = NetworkConfig::default(); + let mut config = LighthouseConfig::default(); // Custom datadir if let Some(dir) = matches.value_of("datadir") { config.data_dir = PathBuf::from(dir.to_string()); } - // Custom listen port + // Custom p2p listen port if let Some(port) = matches.value_of("port") { - config.listen_multiaddr = - NetworkConfig::multiaddr_on_port(&port.to_string()); + config.p2p_listen_port = port.to_string(); } info!(log, ""; "data_dir" => &config.data_dir.to_str()); @@ -58,10 +60,8 @@ fn main() { // keys::generate_keys(&log).expect("Failed to generate keys"); } else { let mut state = NetworkState::new(config, &log).expect("setup failed"); - let service = NetworkService::new(state, log.new(o!())); - service.send(vec![31, 32, 33]); - service.bg_thread.join().unwrap(); - + let (service, net_rx) = NetworkService::new(state, log.new(o!())); + sync_start(service, net_rx, log.new(o!())); } info!(log, "Exiting."); } diff --git a/src/p2p/config.rs b/src/p2p/config.rs deleted file mode 100644 index 9513ea0aa7..0000000000 --- a/src/p2p/config.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::env; -use std::path::PathBuf; - -use super::libp2p_core::Multiaddr; - -#[derive(Clone)] -pub struct NetworkConfig { - pub data_dir: PathBuf, - pub listen_multiaddr: Multiaddr, -} - -const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse"; - -impl NetworkConfig { - pub fn default() -> Self{ - let data_dir = { - let home = env::home_dir() - .expect("Unable to determine home dir."); - home.join(DEFAULT_LIGHTHOUSE_DIR) - }; - Self { - data_dir, - listen_multiaddr: NetworkConfig::multiaddr_on_port("0") - - } - } - - /// Return a TCP multiaddress on 0.0.0.0 for a given port. - pub fn multiaddr_on_port(port: &str) -> Multiaddr { - return format!("/ip4/0.0.0.0/tcp/{}", port) - .parse::().unwrap() - } -} diff --git a/src/p2p/keys.rs b/src/p2p/keys.rs deleted file mode 100644 index 8d006e80ee..0000000000 --- a/src/p2p/keys.rs +++ /dev/null @@ -1,40 +0,0 @@ -extern crate secp256k1; -extern crate libp2p_peerstore; -extern crate rand; -extern crate hex; - -use std; -use std::io::prelude::*; -use std::fs::File; -use slog::Logger; - -use std::io::Error as IoError; -use std::path::Path; -use super::config::NetworkConfig; -use self::secp256k1::key::{ SecretKey, PublicKey }; -use self::libp2p_peerstore::PeerId; - -const LOCAL_PK_FILE: &str = "local.pk"; -const LOCAL_SK_FILE: &str = "local.sk"; -const BOOTSTRAP_PK_FILE: &str = "bootstrap.pk"; - -/// Generates a new public and secret key pair and writes them to -/// individual files. -/// -/// This function should only be present during -/// early development states and should be removed. -pub fn generate_keys(config: NetworkConfig, log: &Logger) - -> Result<(), IoError> -{ - // TODO: remove this method and import pem files instead - info!(log, "Generating keys..."); - let mut rng = rand::thread_rng(); - let curve = secp256k1::Secp256k1::new(); - let s = SecretKey::new(&curve, &mut rng); - let s_vec = &s[..]; - let s_string = hex::encode(s_vec); - let mut s_file = File::create(LOCAL_SK_FILE)?; - info!(log, "Writing secret key..."); - s_file.write(s_string.as_bytes())?; - Ok(()) -} diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 460386f336..a3aa609ce1 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -6,5 +6,3 @@ extern crate slog; pub mod service; pub mod state; -// pub mod keys; -pub mod config; diff --git a/src/p2p/service.rs b/src/p2p/service.rs index 9cb11d0ba9..f92b1c84f0 100644 --- a/src/p2p/service.rs +++ b/src/p2p/service.rs @@ -10,6 +10,7 @@ extern crate bigint; extern crate bytes; extern crate futures; extern crate libp2p_peerstore; +extern crate libp2p_floodsub; extern crate libp2p_identify; extern crate libp2p_core; extern crate libp2p_mplex; @@ -24,50 +25,77 @@ extern crate tokio_stdin; use super::state::NetworkState; use self::bigint::U512; use self::futures::{ Future, Stream, Poll }; -use self::futures::sync::mpsc::{ unbounded, UnboundedSender, UnboundedReceiver }; -use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; +use self::futures::sync::mpsc::{ + unbounded, UnboundedSender, UnboundedReceiver +}; +use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, + Transport, ConnectionUpgrade }; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; +use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade }; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::slog::Logger; -use std::sync::{ Arc, RwLock }; +use std::sync::{ Arc, RwLock, Mutex }; use std::time::{ Duration, Instant }; -// use std::sync::mpsc::{ channel, Sender, Receiver }; use std::thread; use std::ops::Deref; use std::io::Error as IoError; +use std::collections::VecDeque; use self::tokio_io::{ AsyncRead, AsyncWrite }; use self::bytes::Bytes; +pub use self::libp2p_floodsub::Message; + pub struct NetworkService { - tx: UnboundedSender>, + app_to_net: UnboundedSender>, pub bg_thread: thread::JoinHandle<()>, + msgs: Mutex>, } impl NetworkService { - pub fn new(state: NetworkState, log: Logger) -> Self { - let (tx, rx) = unbounded(); - let net_rx = NetworkReciever{ inner: rx }; + /// Create a new network service. Spawns a new thread running tokio + /// services. Accepts a NetworkState, which is derived from a NetworkConfig. + /// Also accepts a logger. + pub fn new(state: NetworkState, log: Logger) + -> (Self, UnboundedReceiver>) + { + let (input_tx, input_rx) = unbounded(); // app -> net + let (output_tx, output_rx) = unbounded(); // net -> app let bg_thread = thread::spawn(move || { - listen(state, net_rx, log); + listen(state, output_tx, input_rx, log); }); - - Self { - tx, + let msgs = Mutex::new(VecDeque::new()); + let ns = Self { + app_to_net: input_tx, bg_thread, - } + msgs, + }; + (ns, output_rx) } + /// Sends a message (byte vector) to the network. The present network + /// determines which the recipients of the message. pub fn send(&self, msg: Vec) { - self.tx.unbounded_send(msg); + self.app_to_net.unbounded_send(msg).expect("unable to contact network") + } + + pub fn read_message(&mut self) + -> Option + { + let mut buf = self.msgs.lock().unwrap(); + buf.pop_front() } } -pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) +fn listen(state: NetworkState, + app_tx: UnboundedSender>, + raw_rx: UnboundedReceiver>, + log: Logger) { let peer_store = state.peer_store; let peer_id = state.peer_id; let listen_multiaddr = state.listen_multiaddr; let listened_addrs = Arc::new(RwLock::new(vec![])); + let rx = ApplicationReciever{ inner: raw_rx }; // Build a tokio core let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); @@ -113,11 +141,17 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) KademliaControllerPrototype::new(kad_config); let kad_upgrade = libp2p_kad:: KademliaUpgrade::from_prototype(&kad_ctl_proto); + + // Build a floodsub upgrade to allow pushing topic'ed + // messages across the network. + let (floodsub_upgrade, floodsub_rx) = + FloodSubUpgrade::new(peer_id.clone()); // Combine the Kademlia and Identify upgrades into a single // upgrader struct. let upgrade = ConnectionUpgrader { kad: kad_upgrade.clone(), + floodsub: floodsub_upgrade.clone(), identify: libp2p_identify::IdentifyProtocolConfig, }; @@ -128,6 +162,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) identify_transport.clone().with_upgrade(upgrade), move |upgrade, client_addr| match upgrade { FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, + FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>, FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( IdentifyInfo { public_key: swarm_peer_id.clone().into_bytes(), @@ -137,6 +172,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) protocols: vec![ "/ipfs/kad/1.0.0".to_owned(), "/ipfs/id/1.0.0".to_owned(), + "/floodsub/1.0.0".to_owned(), ] }, &client_addr @@ -159,6 +195,11 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) swarm_ctl.clone(), identify_transport.clone().with_upgrade(kad_upgrade.clone())); + // Create a new floodsub controller using a specific topic + let topic = libp2p_floodsub::TopicBuilder::new("beacon_chain").build(); + let floodsub_ctl = libp2p_floodsub::FloodSubController::new(&floodsub_upgrade); + floodsub_ctl.subscribe(&topic); + // Generate a tokio timer "wheel" future that sends kad FIND_NODE at // a routine interval. let kad_poll_log = log.new(o!()); @@ -180,7 +221,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); let dial_result = swarm_ctl.dial( peer_addr, - identify_transport.clone().with_upgrade(kad_upgrade.clone()) + identify_transport.clone().with_upgrade(floodsub_upgrade.clone()) ); if let Err(err) = dial_result { warn!(kad_poll_log, "Dialling {:?} failed.", err) @@ -190,35 +231,36 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) }) }; - let kad_send_log = log.new(o!()); - let kad_send = rx.for_each(|msg| { - if let Ok(msg) = String::from_utf8(msg) { - info!(kad_send_log, "message: {:?}", msg); - } + // Create a future to handle message recieved from the network + let floodsub_rx = floodsub_rx.for_each(|msg| { + debug!(&log, "Network receive"; "msg" => format!("{:?}", msg)); + app_tx.unbounded_send(msg.data) + .expect("Network unable to contact application."); Ok(()) }); - // Generate a future featuring the kad init future - // and the kad polling cycle. + // Create a future to handle messages recieved from the application + let app_rx = rx.for_each(|msg| { + debug!(&log, "Network publish"; "msg" => format!("{:?}", msg)); + floodsub_ctl.publish(&topic, msg); + Ok(()) + }); + + // Generate a full future let final_future = swarm_future - .select(kad_send) - .map_err(|(err, _)| err) - .map(|((), _)| ()) - .select(kad_poll) - .map_err(|(err, _)| err) - .map(|((), _)| ()) - .select(kad_init) - .map_err(|(err, _)| err) - .and_then(|((), n)| n); + .select(floodsub_rx).map_err(|(err, _)| err).map(|((), _)| ()) + .select(app_rx).map_err(|(err, _)| err).map(|((), _)| ()) + .select(kad_poll).map_err(|(err, _)| err).map(|((), _)| ()) + .select(kad_init).map_err(|(err, _)| err).and_then(|((), n)| n); core.run(final_future).unwrap(); } -pub struct NetworkReciever { +struct ApplicationReciever { inner: UnboundedReceiver>, } -impl Stream for NetworkReciever { +impl Stream for ApplicationReciever { type Item = Vec; type Error = IoError; @@ -233,6 +275,7 @@ impl Stream for NetworkReciever { struct ConnectionUpgrader { kad: KademliaUpgrade, identify: libp2p_identify::IdentifyProtocolConfig, + floodsub: FloodSubUpgrade, } impl ConnectionUpgrade for ConnectionUpgrader @@ -252,6 +295,7 @@ where vec![ (Bytes::from("/ipfs/kad/1.0.0"), 0), (Bytes::from("/ipfs/id/1.0.0"), 1), + (Bytes::from("/floodsub/1.0.0"), 2), ].into_iter() } @@ -272,6 +316,11 @@ where self.identify .upgrade(socket, (), ty, remote_addr) .map(|upg| upg.into())), + 2 => Box::new( + self.floodsub + .upgrade(socket, (), ty, remote_addr) + .map(|upg| upg.into()), + ), _ => unreachable!() } @@ -280,11 +329,11 @@ where enum FinalUpgrade { Kad(KademliaProcessingFuture), - Identify(IdentifyOutput) + Identify(IdentifyOutput), + FloodSub(FloodSubFuture), } -impl From for FinalUpgrade { - #[inline] +impl From for FinalUpgrade { #[inline] fn from(upgrade: libp2p_kad::KademliaProcessingFuture) -> Self { FinalUpgrade::Kad(upgrade) } @@ -296,3 +345,10 @@ impl From> for FinalUpgrade { FinalUpgrade::Identify(upgrade) } } + +impl From for FinalUpgrade { + #[inline] + fn from(upgr: FloodSubFuture) -> Self { + FinalUpgrade::FloodSub(upgr) + } +} diff --git a/src/p2p/state.rs b/src/p2p/state.rs index 982e13cfdd..18bf7433b7 100644 --- a/src/p2p/state.rs +++ b/src/p2p/state.rs @@ -6,7 +6,7 @@ use std::fs::File; use std::sync::Arc; use std::time::Duration; -use super::config::NetworkConfig; +use super::super::config::LighthouseConfig; use super::libp2p_core::Multiaddr; use super::libp2p_peerstore::{ Peerstore, PeerAccess, PeerId }; use super::libp2p_peerstore::json_peerstore::JsonPeerstore; @@ -20,7 +20,7 @@ const PEERS_FILE: &str = "peerstore.json"; const LOCAL_PEM_FILE: &str = "local_peer_id.pem"; pub struct NetworkState { - pub config: NetworkConfig, + pub config: LighthouseConfig, pub pubkey: PublicKey, pub seckey: SecretKey, pub peer_id: PeerId, @@ -29,7 +29,7 @@ pub struct NetworkState { } impl NetworkState { - pub fn new(config: NetworkConfig, log: &Logger) -> Result > { + pub fn new(config: LighthouseConfig, log: &Logger) -> Result > { let curve = Secp256k1::new(); let seckey = match NetworkState::load_secret_key_from_pem_file(&config, &curve) @@ -47,7 +47,9 @@ impl NetworkState { Arc::new(base) }; info!(log, "Loaded peerstore"; "peer_count" => &peer_store.peers().count()); - let listen_multiaddr = config.listen_multiaddr.clone(); + // let listen_multiaddr = config.listen_multiaddr.clone(); + let listen_multiaddr = + NetworkState::multiaddr_on_port(&config.p2p_listen_port); Ok(Self { config: config, seckey, @@ -58,6 +60,12 @@ impl NetworkState { }) } + /// Return a TCP multiaddress on 0.0.0.0 for a given port. + pub fn multiaddr_on_port(port: &str) -> Multiaddr { + return format!("/ip4/0.0.0.0/tcp/{}", port) + .parse::().unwrap() + } + pub fn add_peer(&mut self, peer_id: PeerId, multiaddr: Multiaddr, @@ -67,7 +75,7 @@ impl NetworkState { } /// Instantiate a SecretKey from a .pem file on disk. - pub fn load_secret_key_from_pem_file(config: &NetworkConfig, curve: &Secp256k1) + pub fn load_secret_key_from_pem_file(config: &LighthouseConfig, curve: &Secp256k1) -> Result> { let path = config.data_dir.join(LOCAL_PEM_FILE); @@ -81,7 +89,7 @@ impl NetworkState { /// Generate a new SecretKey and store it on disk as a .pem file. pub fn generate_new_secret_key( - config: &NetworkConfig, + config: &LighthouseConfig, curve: &Secp256k1) -> Result> { diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000000..6ccceeb671 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,38 @@ +extern crate futures; +extern crate slog; +extern crate tokio; + +use super::p2p::service::NetworkService; +use self::futures::sync::mpsc::UnboundedReceiver; +use self::futures::Stream; +use slog::Logger; +use self::tokio::timer::Interval; +use self::tokio::prelude::*; + +use std::time::{ Duration, Instant }; + +pub fn sync_start(service: NetworkService, + net_stream: UnboundedReceiver>, + log: Logger) +{ + let net_rx = net_stream + .for_each(move |msg| { + debug!(&log, "Sync receive"; "msg" => format!("{:?}", msg)); + // service.send("hello".to_bytes()); + Ok(()) + }) + .map_err(|_| panic!("rx failed")); + + let poll = Interval::new(Instant::now(), Duration::from_secs(2)) + .for_each(move |_| { + service.send(vec![42, 42, 42]); + Ok(()) + }) + .map_err(|_| panic!("send failed")); + + let sync_future = poll + .select(net_rx).map_err(|(err, _)| err) + .and_then(|((), n)| n); + + tokio::run(sync_future); +}