mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
Make range sync chain Id sequential (#6868)
Currently, we set the `chain_id` of range sync chains to `u64(hash(target_root, target_slot))`, which results in a long integer. ``` Jan 27 00:43:27.246 DEBG Batch downloaded, chain: 4223372036854775807, awaiting_batches: 0, batch_state: [p,E,E,E,E], blocks: 0, epoch: 0, service: range_sync ``` Instead, we can use `network_context.next_id()` as we do for all other sync items and get a unique sequential (not too big) integer as id. ``` Jan 27 00:43:27.246 DEBG Batch downloaded, chain: 4, awaiting_batches: 0, batch_state: [p,E,E,E,E], blocks: 0, epoch: 0, service: range_sync ``` Also, if a specific chain for the same target is retried later, it won't get the same ID so we can more clearly differentiate the logs associated with each attempt.
This commit is contained in:
@@ -15,7 +15,6 @@ use rand::seq::SliceRandom;
|
||||
use rand::Rng;
|
||||
use slog::{crit, debug, o, warn};
|
||||
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use strum::IntoStaticStr;
|
||||
use types::{Epoch, EthSpec, Hash256, Slot};
|
||||
|
||||
@@ -56,7 +55,7 @@ pub enum RemoveChain {
|
||||
pub struct KeepChain;
|
||||
|
||||
/// A chain identifier
|
||||
pub type ChainId = u64;
|
||||
pub type ChainId = Id;
|
||||
pub type BatchId = Epoch;
|
||||
|
||||
#[derive(Debug, Copy, Clone, IntoStaticStr)]
|
||||
@@ -127,14 +126,9 @@ pub enum ChainSyncingState {
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 {
|
||||
let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
||||
(target_root, target_slot).hash(&mut hasher);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
id: Id,
|
||||
start_epoch: Epoch,
|
||||
target_head_slot: Slot,
|
||||
target_head_root: Hash256,
|
||||
@@ -145,8 +139,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
let mut peers = FnvHashMap::default();
|
||||
peers.insert(peer_id, Default::default());
|
||||
|
||||
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
||||
|
||||
SyncingChain {
|
||||
id,
|
||||
chain_type,
|
||||
@@ -165,6 +157,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this chain has the same target
|
||||
pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool {
|
||||
self.target_head_slot == target_head_slot && self.target_head_root == target_head_root
|
||||
}
|
||||
|
||||
/// Check if the chain has peers from which to process batches.
|
||||
pub fn available_peers(&self) -> usize {
|
||||
self.peers.len()
|
||||
@@ -1258,7 +1255,7 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
|
||||
serializer: &mut dyn slog::Serializer,
|
||||
) -> slog::Result {
|
||||
use slog::Value;
|
||||
serializer.emit_u64("id", self.id)?;
|
||||
serializer.emit_u32("id", self.id)?;
|
||||
Value::serialize(&self.start_epoch, record, "from", serializer)?;
|
||||
Value::serialize(
|
||||
&self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()),
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::metrics;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::service::api_types::Id;
|
||||
use lighthouse_network::PeerId;
|
||||
use lighthouse_network::SyncInfo;
|
||||
use slog::{crit, debug, error};
|
||||
@@ -29,9 +30,9 @@ const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10;
|
||||
#[derive(Clone)]
|
||||
pub enum RangeSyncState {
|
||||
/// A finalized chain is being synced.
|
||||
Finalized(u64),
|
||||
Finalized(Id),
|
||||
/// There are no finalized chains and we are syncing one more head chains.
|
||||
Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>),
|
||||
Head(SmallVec<[Id; PARALLEL_HEAD_CHAINS]>),
|
||||
/// There are no head or finalized chains and no long range sync is in progress.
|
||||
Idle,
|
||||
}
|
||||
@@ -74,7 +75,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
if syncing_id == id {
|
||||
// the finalized chain that was syncing was removed
|
||||
debug_assert!(was_syncing && sync_type == RangeSyncType::Finalized);
|
||||
let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self
|
||||
let syncing_head_ids: SmallVec<[Id; PARALLEL_HEAD_CHAINS]> = self
|
||||
.head_chains
|
||||
.iter()
|
||||
.filter(|(_id, chain)| chain.is_syncing())
|
||||
@@ -355,7 +356,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
.collect::<Vec<_>>();
|
||||
preferred_ids.sort_unstable();
|
||||
|
||||
let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new();
|
||||
let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new();
|
||||
for (_, _, id) in preferred_ids {
|
||||
let chain = self.head_chains.get_mut(&id).expect("known chain");
|
||||
if syncing_chains.len() < PARALLEL_HEAD_CHAINS {
|
||||
@@ -465,15 +466,17 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
sync_type: RangeSyncType,
|
||||
network: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
||||
let collection = if let RangeSyncType::Finalized = sync_type {
|
||||
&mut self.finalized_chains
|
||||
} else {
|
||||
&mut self.head_chains
|
||||
};
|
||||
match collection.entry(id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let chain = entry.get_mut();
|
||||
|
||||
match collection
|
||||
.iter_mut()
|
||||
.find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root))
|
||||
{
|
||||
Some((&id, chain)) => {
|
||||
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain);
|
||||
debug_assert_eq!(chain.target_head_root, target_head_root);
|
||||
debug_assert_eq!(chain.target_head_slot, target_head_slot);
|
||||
@@ -483,13 +486,16 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
} else {
|
||||
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
|
||||
}
|
||||
let chain = entry.remove();
|
||||
self.on_chain_removed(&id, chain.is_syncing(), sync_type);
|
||||
let is_syncing = chain.is_syncing();
|
||||
collection.remove(&id);
|
||||
self.on_chain_removed(&id, is_syncing, sync_type);
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
None => {
|
||||
let peer_rpr = peer.to_string();
|
||||
let id = network.next_id();
|
||||
let new_chain = SyncingChain::new(
|
||||
id,
|
||||
start_epoch,
|
||||
target_head_slot,
|
||||
target_head_root,
|
||||
@@ -497,9 +503,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
sync_type.into(),
|
||||
&self.log,
|
||||
);
|
||||
debug_assert_eq!(new_chain.get_id(), id);
|
||||
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
|
||||
entry.insert(new_chain);
|
||||
collection.insert(id, new_chain);
|
||||
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]);
|
||||
self.update_metrics();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user