diff --git a/beacon_node/network/src/status.rs b/beacon_node/network/src/status.rs index fa52fddc36..ade490e00e 100644 --- a/beacon_node/network/src/status.rs +++ b/beacon_node/network/src/status.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use lighthouse_network::rpc::StatusMessage; @@ -25,9 +23,3 @@ impl ToStatusMessage for BeaconChain { }) } } - -impl ToStatusMessage for Arc> { - fn status_message(&self) -> Result { - as ToStatusMessage>::status_message(self) - } -} diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e8b67ba92a..e991e86e05 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -62,10 +62,10 @@ impl SyncNetworkContext { pub fn status_peers( &mut self, - chain: C, + chain: &C, peers: impl Iterator, ) { - if let Ok(status_message) = &chain.status_message() { + if let Ok(status_message) = chain.status_message() { for peer_id in peers { debug!( self.log, diff --git a/beacon_node/network/src/sync/range_sync/block_storage.rs b/beacon_node/network/src/sync/range_sync/block_storage.rs index 5590ac6234..5f8033bc51 100644 --- a/beacon_node/network/src/sync/range_sync/block_storage.rs +++ b/beacon_node/network/src/sync/range_sync/block_storage.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use beacon_chain::{BeaconChain, BeaconChainTypes}; use types::Hash256; @@ -8,7 +6,7 @@ pub trait BlockStorage { fn is_block_known(&self, block_root: &Hash256) -> bool; } -impl BlockStorage for Arc> { +impl BlockStorage for BeaconChain { fn is_block_known(&self, block_root: &Hash256) -> bool { self.fork_choice.read().contains_block(block_root) } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 4dc9c1d01c..512f7a989a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -17,6 +17,7 @@ use slog::{crit, debug, error}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -41,7 +42,7 @@ pub enum RangeSyncState { /// A collection of finalized and head chains currently being processed. pub struct ChainCollection { /// The beacon chain for processing. - beacon_chain: C, + beacon_chain: Arc, /// The set of finalized chains being synced. finalized_chains: FnvHashMap>, /// The set of head chains being synced. @@ -53,7 +54,7 @@ pub struct ChainCollection { } impl ChainCollection { - pub fn new(beacon_chain: C, log: slog::Logger) -> Self { + pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { ChainCollection { beacon_chain, finalized_chains: FnvHashMap::default(), @@ -406,6 +407,7 @@ impl ChainCollection { local_info: &SyncInfo, awaiting_head_peers: &mut HashMap, ) { + debug!(self.log, "Purging chains"); let local_finalized_slot = local_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); @@ -414,7 +416,10 @@ impl ChainCollection { let log_ref = &self.log; let is_outdated = |target_slot: &Slot, target_root: &Hash256| { - target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root) + let is = + target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root); + debug!(log_ref, "Chain is outdated {}", is); + is }; // Retain only head peers that remain relevant @@ -424,31 +429,35 @@ impl ChainCollection { // Remove chains that are out-dated let mut removed_chains = Vec::new(); - self.finalized_chains.retain(|id, chain| { + removed_chains.extend(self.finalized_chains.iter().filter_map(|(id, chain)| { if is_outdated(&chain.target_head_slot, &chain.target_head_root) || chain.available_peers() == 0 { debug!(log_ref, "Purging out of finalized chain"; &chain); - removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Finalized)); - false + Some((*id, chain.is_syncing(), RangeSyncType::Finalized)) } else { - true + None } - }); - self.head_chains.retain(|id, chain| { + })); + + removed_chains.extend(self.head_chains.iter().filter_map(|(id, chain)| { if is_outdated(&chain.target_head_slot, &chain.target_head_root) || chain.available_peers() == 0 { debug!(log_ref, "Purging out of date head chain"; &chain); - removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Head)); - false + Some((*id, chain.is_syncing(), RangeSyncType::Head)) } else { - true + None } - }); + })); // update the state of the collection for (id, was_syncing, sync_type) in removed_chains { + // remove each chain, updating the state for each removal. + match sync_type { + RangeSyncType::Finalized => self.finalized_chains.remove(&id), + RangeSyncType::Head => self.head_chains.remove(&id), + }; self.on_chain_removed(&id, was_syncing, sync_type); } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 2786ef410d..f6cf4199bd 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -59,9 +59,9 @@ use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This /// holds the current state of the long range sync. -pub struct RangeSync>> { +pub struct RangeSync> { /// The beacon chain for processing. - beacon_chain: C, + beacon_chain: Arc, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. awaiting_head_peers: HashMap, @@ -76,11 +76,11 @@ pub struct RangeSync>> { impl RangeSync where - C: BlockStorage + Clone + ToStatusMessage, + C: BlockStorage + ToStatusMessage, T: BeaconChainTypes, { pub fn new( - beacon_chain: C, + beacon_chain: Arc, beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { @@ -125,7 +125,7 @@ where // is OK since we since only one finalized chain at a time. // determine which kind of sync to perform and set up the chains - match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { + match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) { RangeSyncType::Finalized => { // Finalized chain search debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id); @@ -337,7 +337,7 @@ where debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); } - network.status_peers(self.beacon_chain.clone(), chain.peers()); + network.status_peers(self.beacon_chain.as_ref(), chain.peers()); let local = match self.beacon_chain.status_message() { Ok(status) => SyncInfo { @@ -376,21 +376,21 @@ mod tests { use slog::{o, Drain}; use slot_clock::SystemTimeSlotClock; - use std::sync::atomic::AtomicBool; + use std::collections::HashSet; use std::sync::Arc; use store::MemoryStore; use types::{Hash256, MinimalEthSpec as E}; #[derive(Debug)] struct FakeStorage { - is_block_known: AtomicBool, + known_blocks: RwLock>, status: RwLock, } impl Default for FakeStorage { fn default() -> Self { FakeStorage { - is_block_known: AtomicBool::new(false), + known_blocks: RwLock::new(HashSet::new()), status: RwLock::new(StatusMessage { fork_digest: [0; 4], finalized_root: Hash256::zero(), @@ -402,14 +402,24 @@ mod tests { } } - impl BlockStorage for Arc { - fn is_block_known(&self, _block_root: &store::Hash256) -> bool { - self.is_block_known - .load(std::sync::atomic::Ordering::Relaxed) + impl FakeStorage { + fn remember_block(&self, block_root: Hash256) { + self.known_blocks.write().insert(block_root); + } + + #[allow(dead_code)] + fn forget_block(&self, block_root: &Hash256) { + self.known_blocks.write().remove(block_root); } } - impl ToStatusMessage for Arc { + impl BlockStorage for FakeStorage { + fn is_block_known(&self, block_root: &store::Hash256) -> bool { + self.known_blocks.read().contains(block_root) + } + } + + impl ToStatusMessage for FakeStorage { fn status_message(&self) -> Result { Ok(self.status.read().clone()) } @@ -446,7 +456,7 @@ mod tests { globals: Arc>, } - impl RangeSync> { + impl RangeSync { fn assert_state(&self, expected_state: RangeSyncType) { assert_eq!( self.state() @@ -456,6 +466,14 @@ mod tests { expected_state ) } + + #[allow(dead_code)] + fn assert_not_syncing(&self) { + assert!( + self.state().expect("State is ok").is_none(), + "Range should not be syncing." + ); + } } impl TestRig { @@ -525,7 +543,7 @@ mod tests { let local_info = self.local_info(); let finalized_root = Hash256::random(); - let finalized_epoch = local_info.finalized_epoch + 1; + let finalized_epoch = local_info.finalized_epoch + 2; let head_slot = finalized_epoch.start_slot(E::slots_per_epoch()); let head_root = Hash256::random(); let remote_info = SyncInfo { @@ -540,11 +558,11 @@ mod tests { } } - fn range(log_enabled: bool) -> (TestRig, RangeSync>) { + fn range(log_enabled: bool) -> (TestRig, RangeSync) { let chain = Arc::new(FakeStorage::default()); let log = build_log(slog::Level::Trace, log_enabled); let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10); - let range_sync = RangeSync::>::new( + let range_sync = RangeSync::::new( chain.clone(), beacon_processor_tx, log.new(o!("component" => "range")), @@ -592,7 +610,7 @@ mod tests { #[test] fn head_chain_removed_while_finalized_syncing() { // NOTE: this is a regression test. - let (mut rig, mut range) = range(true); + let (mut rig, mut range) = range(false); // Get a peer with an advanced head let (head_peer, local_info, remote_info) = rig.head_peer(); @@ -614,4 +632,36 @@ mod tests { range.remove_peer(&mut rig.cx, &head_peer); range.assert_state(RangeSyncType::Finalized); } + + #[test] + fn state_update_while_purging() { + // NOTE: this is a regression test. + let (mut rig, mut range) = range(true); + + // Get a peer with an advanced head + let (head_peer, local_info, head_info) = rig.head_peer(); + let head_peer_root = head_info.head_root; + range.add_peer(&mut rig.cx, local_info, head_peer, head_info); + range.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _request = rig.grab_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); + let finalized_peer_root = remote_info.finalized_root; + range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); + range.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _second_request = rig.grab_request(&finalized_peer); + + // Now the chain knows both chains target roots. + rig.chain.remember_block(head_peer_root); + rig.chain.remember_block(finalized_peer_root); + + // Add an additional peer to the second chain to make range update it's status + let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); + range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); + } }