From 23a35c37672a05fb9267a0af66cc027a2e945460 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 23 Jan 2020 12:46:11 +0530 Subject: [PATCH] Persist/load DHT on shutdown/startup (#659) * Store dht enrs on shutdown * Load enrs on startup and add tests * Remove enr_entries from behavior * Move all dht persisting logic to `NetworkService` * Move `PersistedDht` from eth2-libp2p to network crate * Add test to confirm dht persistence * Add logging * Remove extra call to beacon_chain persist * Expose only mutable `add_enr` method from behaviour * Fix tests * Fix merge errors --- Cargo.lock | 2 + beacon_node/client/src/lib.rs | 8 -- beacon_node/eth2-libp2p/src/behaviour.rs | 11 ++ beacon_node/eth2-libp2p/src/discovery.rs | 5 + beacon_node/network/Cargo.toml | 2 + beacon_node/network/src/lib.rs | 1 + beacon_node/network/src/persisted_dht.rs | 51 +++++++ beacon_node/network/src/service.rs | 169 ++++++++++++++++++++++- beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/lib.rs | 2 + 10 files changed, 242 insertions(+), 10 deletions(-) create mode 100644 beacon_node/network/src/persisted_dht.rs diff --git a/Cargo.lock b/Cargo.lock index db034d75fd..ee72d36d06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2665,9 +2665,11 @@ dependencies = [ "eth2_ssz 0.1.2", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "genesis 0.1.0", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rlp 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "sloggers 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 2e20f1e671..a396e8646a 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -61,11 +61,3 @@ impl Client { self.libp2p_network.as_ref().map(|n| n.local_enr()) } } - -impl Drop for Client { - fn drop(&mut self) { - if let Some(beacon_chain) = &self.beacon_chain { - let _result = beacon_chain.persist(); - } - } -} diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 608cccf7dd..ddc9292014 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -3,6 +3,7 @@ use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::GossipTopic; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; +use enr::Enr; use futures::prelude::*; use libp2p::{ core::identity::Keypair, @@ -254,6 +255,16 @@ impl Behaviour { pub fn peer_unbanned(&mut self, peer_id: &PeerId) { self.discovery.peer_unbanned(peer_id); } + + /// Returns an iterator over all enr entries in the DHT. + pub fn enr_entries(&mut self) -> impl Iterator { + self.discovery.enr_entries() + } + + /// Add an ENR to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + self.discovery.add_enr(enr); + } } /// The types of events than can be obtained from polling the behaviour. diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index d2c46da1ae..a1d1fa5c9b 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -148,6 +148,11 @@ impl Discovery { self.banned_peers.remove(peer_id); } + /// Returns an iterator over all enr entries in the DHT. + pub fn enr_entries(&mut self) -> impl Iterator { + self.discovery.enr_entries() + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 906c4f938d..da8c7712c2 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dev-dependencies] sloggers = "0.3.4" +genesis = { path = "../genesis" } [dependencies] beacon_chain = { path = "../beacon_chain" } @@ -24,3 +25,4 @@ smallvec = "1.0.0" # TODO: Remove rand crate for mainnet rand = "0.7.2" fnv = "1.0.6" +rlp = "0.4.3" diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 83d4935cab..fddca767bd 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod message_handler; pub mod message_processor; +pub mod persisted_dht; pub mod service; pub mod sync; diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs new file mode 100644 index 0000000000..dd894742b5 --- /dev/null +++ b/beacon_node/network/src/persisted_dht.rs @@ -0,0 +1,51 @@ +use eth2_libp2p::Enr; +use rlp; +use store::{DBColumn, Error as StoreError, SimpleStoreItem}; + +/// 32-byte key for accessing the `DhtEnrs`. +pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; + +/// Wrapper around dht for persistence to disk. +pub struct PersistedDht { + pub enrs: Vec, +} + +impl SimpleStoreItem for PersistedDht { + fn db_column() -> DBColumn { + DBColumn::DhtEnrs + } + + fn as_store_bytes(&self) -> Vec { + rlp::encode_list(&self.enrs) + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let rlp = rlp::Rlp::new(bytes); + let enrs: Vec = rlp + .as_list() + .map_err(|e| StoreError::RlpError(format!("{}", e)))?; + Ok(PersistedDht { enrs }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use eth2_libp2p::Enr; + use std::str::FromStr; + use std::sync::Arc; + use store::{MemoryStore, Store}; + use types::Hash256; + use types::MinimalEthSpec; + #[test] + fn test_persisted_dht() { + let store = Arc::new(MemoryStore::::open()); + let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()]; + let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); + store + .put(&key, &PersistedDht { enrs: enrs.clone() }) + .unwrap(); + let dht: PersistedDht = store.get(&key).unwrap().unwrap(); + assert_eq!(dht.enrs, enrs); + } +} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7936e23d61..e10ab4a45f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,5 +1,6 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; +use crate::persisted_dht::{PersistedDht, DHT_DB_KEY}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use core::marker::PhantomData; @@ -9,10 +10,12 @@ use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; use parking_lot::Mutex; -use slog::{debug, info, trace}; +use slog::{debug, error, info, trace}; use std::sync::Arc; +use store::Store; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; +use types::Hash256; /// The time in seconds that a peer will be banned and prevented from reconnecting. const BAN_PEER_TIMEOUT: u64 = 30; @@ -21,6 +24,8 @@ const BAN_PEER_TIMEOUT: u64 = 30; pub struct Service { libp2p_service: Arc>, libp2p_port: u16, + store: Arc, + log: slog::Logger, _libp2p_exit: oneshot::Sender<()>, _network_send: mpsc::UnboundedSender, _phantom: PhantomData, @@ -35,6 +40,8 @@ impl Service { ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::(); + // Get a reference to the beacon chain store + let store = beacon_chain.store.clone(); // launch message handler thread let message_handler_send = MessageHandler::spawn( beacon_chain, @@ -49,17 +56,32 @@ impl Service { network_log.clone(), )?)); + // Load DHT from store + let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); + let enrs: Vec = match store.get(&key) { + Ok(Some(p)) => { + let p: PersistedDht = p; + p.enrs + } + _ => Vec::new(), + }; + for enr in enrs { + libp2p_service.lock().swarm.add_enr(enr); + } + let libp2p_exit = spawn_service( libp2p_service.clone(), network_recv, message_handler_send, executor, - network_log, + network_log.clone(), config.propagation_percentage, )?; let network_service = Service { libp2p_service, libp2p_port: config.libp2p_port, + store, + log: network_log, _libp2p_exit: libp2p_exit, _network_send: network_send.clone(), _phantom: PhantomData, @@ -117,6 +139,25 @@ impl Service { pub fn libp2p_service(&self) -> Arc> { self.libp2p_service.clone() } + + /// Attempt to persist the enrs in the DHT to `self.store`. + pub fn persist_dht(&self) -> Result<(), store::Error> { + let enrs: Vec = self + .libp2p_service() + .lock() + .swarm + .enr_entries() + .map(|x| x.clone()) + .collect(); + info!( + self.log, + "Persisting DHT to store"; + "Number of peers" => format!("{}", enrs.len()), + ); + let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); + self.store.put(&key, &PersistedDht { enrs })?; + Ok(()) + } } fn spawn_service( @@ -307,3 +348,127 @@ pub enum NetworkMessage { /// Disconnect and bans a peer id. Disconnect { peer_id: PeerId }, } + +impl Drop for Service { + fn drop(&mut self) { + if let Err(e) = self.persist_dht() { + error!( + self.log, + "Failed to persist DHT on drop"; + "error" => format!("{:?}", e) + ) + } else { + info!( + self.log, + "Saved DHT state"; + ) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::builder::BeaconChainBuilder; + use eth2_libp2p::Enr; + use genesis::{generate_deterministic_keypairs, interop_genesis_state}; + use slog::Logger; + use sloggers::{null::NullLoggerBuilder, Build}; + use std::str::FromStr; + use store::{migrate::NullMigrator, SimpleDiskStore}; + use tokio::runtime::Runtime; + use types::{EthSpec, MinimalEthSpec}; + + fn get_logger() -> Logger { + let builder = NullLoggerBuilder; + builder.build().expect("should build logger") + } + + #[test] + fn test_dht_persistence() { + // Create new LevelDB store + let path = "/tmp"; + let store = Arc::new(SimpleDiskStore::open(&std::path::PathBuf::from(path)).unwrap()); + // Create a `BeaconChain` object to pass to `Service` + let validator_count = 8; + let genesis_time = 13371337; + + let log = get_logger(); + let spec = MinimalEthSpec::default_spec(); + + let genesis_state = interop_genesis_state( + &generate_deterministic_keypairs(validator_count), + genesis_time, + &spec, + ) + .expect("should create interop genesis state"); + let chain = BeaconChainBuilder::new(MinimalEthSpec) + .logger(log.clone()) + .store(store) + .store_migrator(NullMigrator) + .genesis_state(genesis_state) + .expect("should build state using recent genesis") + .dummy_eth1_backend() + .expect("should build the dummy eth1 backend") + .null_event_handler() + .testing_slot_clock(std::time::Duration::from_secs(1)) + .expect("should configure testing slot clock") + .reduced_tree_fork_choice() + .expect("should add fork choice to builder") + .build() + .expect("should build"); + let beacon_chain = Arc::new(chain); + let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap(); + let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap(); + let enrs = vec![enr1, enr2]; + + let runtime = Runtime::new().unwrap(); + + // Create new network service + let (service, _) = Service::new( + beacon_chain.clone(), + &NetworkConfig::default(), + &runtime.executor(), + log.clone(), + ) + .unwrap(); + + // Add enrs manually to dht + for enr in enrs.iter() { + service.libp2p_service().lock().swarm.add_enr(enr.clone()); + } + assert_eq!( + enrs.len(), + service + .libp2p_service() + .lock() + .swarm + .enr_entries() + .collect::>() + .len(), + "DHT should have 2 enrs" + ); + // Drop the service value + std::mem::drop(service); + + // Recover the network service from beacon chain store and fresh network config + let (recovered_service, _) = Service::new( + beacon_chain, + &NetworkConfig::default(), + &runtime.executor(), + log.clone(), + ) + .unwrap(); + assert_eq!( + enrs.len(), + recovered_service + .libp2p_service() + .lock() + .swarm + .enr_entries() + .collect::>() + .len(), + "Recovered DHT should have 2 enrs" + ); + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 8a03d00c3c..03424c08e0 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -11,6 +11,7 @@ pub enum Error { PartialBeaconStateError, HotColdDBError(HotColdDBError), DBError { message: String }, + RlpError(String), } impl From for Error { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index b15477e0ef..c4f26704df 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -171,6 +171,7 @@ pub enum DBColumn { BeaconStateRoots, BeaconHistoricalRoots, BeaconRandaoMixes, + DhtEnrs, } impl Into<&'static str> for DBColumn { @@ -187,6 +188,7 @@ impl Into<&'static str> for DBColumn { DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconHistoricalRoots => "bhr", DBColumn::BeaconRandaoMixes => "brm", + DBColumn::DhtEnrs => "dht", } } }