Consistent logic to select range sync start_slot

This commit is contained in:
dapplion
2025-11-14 21:15:09 -03:00
parent b5260db5e6
commit 167fef3176
4 changed files with 160 additions and 111 deletions

View File

@@ -42,12 +42,12 @@ use super::peer_sync_info::{PeerSyncType, remote_sync_type};
use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
};
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
use crate::sync::peer_sync_info::{LocalSyncInfo, PeerSyncTypeAdvanced};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{
@@ -384,16 +384,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to
/// ours that we consider it fully sync'd with respect to our current chain.
fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) {
// ensure the beacon chain still exists
let status = self.chain.status_message();
let local = SyncInfo {
head_slot: *status.head_slot(),
head_root: *status.head_root(),
finalized_epoch: *status.finalized_epoch(),
finalized_root: *status.finalized_root(),
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
};
let local = LocalSyncInfo::new(&self.chain);
let sync_type = remote_sync_type(&local, &remote, &self.chain);
// update the state of the peer.
@@ -401,9 +392,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if is_still_connected {
match sync_type {
PeerSyncType::Behind => {} // Do nothing
PeerSyncType::Advanced => {
PeerSyncType::Advanced(advanced_type) => {
self.range_sync
.add_peer(&mut self.network, local, peer_id, remote);
.add_peer(&mut self.network, local, peer_id, advanced_type);
}
PeerSyncType::FullySynced => {
// Sync considers this peer close enough to the head to not trigger range sync.
@@ -438,15 +429,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
head_root: Hash256,
head_slot: Option<Slot>,
) {
let status = self.chain.status_message();
let local = SyncInfo {
head_slot: *status.head_slot(),
head_root: *status.head_root(),
finalized_epoch: *status.finalized_epoch(),
finalized_root: *status.finalized_root(),
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
};
let local = LocalSyncInfo::new(&self.chain);
let head_slot = head_slot.unwrap_or_else(|| {
debug!(
local_head_slot = %local.head_slot,
@@ -456,18 +439,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
local.head_slot
});
let remote = SyncInfo {
head_slot,
head_root,
// Set finalized to same as local to trigger Head sync
finalized_epoch: local.finalized_epoch,
finalized_root: local.finalized_root,
earliest_available_slot: local.earliest_available_slot,
};
for peer_id in peers {
self.range_sync
.add_peer(&mut self.network, local.clone(), *peer_id, remote.clone());
self.range_sync.add_peer(
&mut self.network,
local.clone(),
*peer_id,
PeerSyncTypeAdvanced::Head {
target_root: head_root,
target_slot: head_slot,
start_epoch: local.local_irreversible_epoch,
},
);
}
}
@@ -542,7 +524,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn update_peer_sync_state(
&mut self,
peer_id: &PeerId,
local_sync_info: &SyncInfo,
local_sync_info: &LocalSyncInfo,
remote_sync_info: &SyncInfo,
sync_type: &PeerSyncType,
) -> bool {

View File

@@ -1,14 +1,16 @@
use super::manager::SLOT_IMPORT_TOLERANCE;
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::{SyncInfo, SyncStatus as PeerSyncStatus};
use std::cmp::Ordering;
use types::{Epoch, EthSpec, Hash256, Slot};
/// The type of peer relative to our current state.
pub enum PeerSyncType {
/// The peer is on our chain and is fully synced with respect to our chain.
FullySynced,
/// The peer has a greater knowledge of the chain than us that warrants a full sync.
Advanced,
Advanced(PeerSyncTypeAdvanced),
/// A peer is behind in the sync and not useful to us for downloading blocks.
Behind,
}
@@ -18,13 +20,52 @@ impl PeerSyncType {
match self {
PeerSyncType::FullySynced => PeerSyncStatus::Synced { info: info.clone() },
PeerSyncType::Behind => PeerSyncStatus::Behind { info: info.clone() },
PeerSyncType::Advanced => PeerSyncStatus::Advanced { info: info.clone() },
PeerSyncType::Advanced(_) => PeerSyncStatus::Advanced { info: info.clone() },
}
}
}
pub enum PeerSyncTypeAdvanced {
Finalized {
target_slot: Slot,
target_root: Hash256,
start_epoch: Epoch,
},
Head {
target_slot: Slot,
target_root: Hash256,
start_epoch: Epoch,
},
}
#[derive(Clone)]
pub(crate) struct LocalSyncInfo {
pub head_slot: Slot,
pub finalized_epoch: Epoch,
pub local_irreversible_epoch: Epoch,
}
impl LocalSyncInfo {
pub fn new<T: BeaconChainTypes>(chain: &BeaconChain<T>) -> Self {
let status = chain.status_message();
// Max with the store in case the node has triggered manual finalization
let local_irreversible_epoch = std::cmp::max(
chain.head().finalized_checkpoint().epoch,
chain
.store
.get_split_slot()
.epoch(T::EthSpec::slots_per_epoch()),
);
Self {
head_slot: *status.head_slot(),
finalized_epoch: *status.finalized_epoch(),
local_irreversible_epoch,
}
}
}
pub fn remote_sync_type<T: BeaconChainTypes>(
local: &SyncInfo,
local: &LocalSyncInfo,
remote: &SyncInfo,
chain: &BeaconChain<T>,
) -> PeerSyncType {
@@ -33,6 +74,10 @@ pub fn remote_sync_type<T: BeaconChainTypes>(
let near_range_start = local.head_slot.saturating_sub(SLOT_IMPORT_TOLERANCE);
let near_range_end = local.head_slot.saturating_add(SLOT_IMPORT_TOLERANCE);
// With the remote peer's status message let's figure out if there are enough blocks to discover
// that we trigger sync from them. We don't want to sync any blocks from epochs prior to the
// local irreversible epoch. Our finalized epoch may be less than the local irreversible epoch.
match remote.finalized_epoch.cmp(&local.finalized_epoch) {
Ordering::Less => {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
@@ -63,7 +108,11 @@ pub fn remote_sync_type<T: BeaconChainTypes>(
{
// This peer has a head ahead enough of ours and we have no knowledge of their best
// block.
PeerSyncType::Advanced
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
// This peer is either in the tolerance range, or ahead us with an already rejected
// block.
@@ -71,16 +120,43 @@ pub fn remote_sync_type<T: BeaconChainTypes>(
}
}
Ordering::Greater => {
if (local.finalized_epoch + 1 == remote.finalized_epoch
&& near_range_start <= remote.head_slot
&& remote.head_slot <= near_range_end)
|| chain.block_is_known_to_fork_choice(&remote.head_root)
{
// This peer is near enough to us to be considered synced, or
// we have already synced up to this peer's head
if chain.block_is_known_to_fork_choice(&remote.head_root) {
// We have already synced up to this peer's head
PeerSyncType::FullySynced
} else {
PeerSyncType::Advanced
let finality_advanced = remote.finalized_epoch > local.finalized_epoch + 1;
let head_advanced = remote.head_slot > near_range_end;
let finality_ahead_local_irreversible =
remote.finalized_epoch > local.local_irreversible_epoch;
if finality_advanced {
if finality_ahead_local_irreversible {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Finalized {
target_root: remote.finalized_root,
target_slot: remote
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch()),
start_epoch: local.local_irreversible_epoch,
})
} else if head_advanced {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
PeerSyncType::FullySynced
}
} else if head_advanced {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
// This peer is near enough to us to be considered synced
PeerSyncType::FullySynced
}
}
}
}

View File

@@ -7,14 +7,14 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::peer_sync_info::LocalSyncInfo;
use crate::sync::range_sync::range::AwaitingHeadPeers;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
use lighthouse_network::service::api_types::Id;
use logging::crit;
use smallvec::SmallVec;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use tracing::{debug, error};
@@ -193,24 +193,18 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn update(
&mut self,
network: &mut SyncNetworkContext<T>,
local: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
// Remove any outdated finalized/head chains
self.purge_outdated_chains(local, awaiting_head_peers);
let local_head_epoch = local.head_slot.epoch(T::EthSpec::slots_per_epoch());
// Choose the best finalized chain if one needs to be selected.
self.update_finalized_chains(network, local.finalized_epoch, local_head_epoch);
self.update_finalized_chains(network, local);
if !matches!(self.state, RangeSyncState::Finalized(_)) {
// Handle head syncing chains if there are no finalized chains left.
self.update_head_chains(
network,
local.finalized_epoch,
local_head_epoch,
awaiting_head_peers,
);
self.update_head_chains(network, local, awaiting_head_peers);
}
}
@@ -253,8 +247,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
fn update_finalized_chains(
&mut self,
network: &mut SyncNetworkContext<T>,
local_epoch: Epoch,
local_head_epoch: Epoch,
local: &LocalSyncInfo,
) {
// Find the chain with most peers and check if it is already syncing
if let Some((mut new_id, max_peers)) = self
@@ -303,8 +296,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
// update the state to a new finalized state
self.state = RangeSyncState::Finalized(new_id);
if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch)
{
if let Err(remove_reason) = chain.start_syncing(
network,
local.local_irreversible_epoch,
local.head_slot.epoch(T::EthSpec::slots_per_epoch()),
) {
if remove_reason.is_critical() {
crit!(chain = new_id, reason = ?remove_reason, "Chain removed while switching chains");
} else {
@@ -321,17 +317,16 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
fn update_head_chains(
&mut self,
network: &mut SyncNetworkContext<T>,
local_epoch: Epoch,
local_head_epoch: Epoch,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
// Include the awaiting head peers
for (peer_id, peer_sync_info) in awaiting_head_peers.drain() {
for (peer_id, (target_root, target_slot)) in awaiting_head_peers.drain() {
debug!("including head peer");
self.add_peer_or_create_chain(
local_epoch,
peer_sync_info.head_root,
peer_sync_info.head_slot,
local.local_irreversible_epoch,
target_root,
target_slot,
peer_id,
RangeSyncType::Head,
network,
@@ -361,9 +356,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if !chain.is_syncing() {
debug!(id = chain.id(), "New head chain started syncing");
}
if let Err(remove_reason) =
chain.start_syncing(network, local_epoch, local_head_epoch)
{
if let Err(remove_reason) = chain.start_syncing(
network,
local.local_irreversible_epoch,
local.head_slot.epoch(T::EthSpec::slots_per_epoch()),
) {
self.head_chains.remove(&id);
if remove_reason.is_critical() {
crit!(chain = id, reason = ?remove_reason, "Chain removed while switching head chains");
@@ -396,8 +393,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// finalized block slot. Peers that would create outdated chains are removed too.
pub fn purge_outdated_chains(
&mut self,
local_info: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local_info: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
let local_finalized_slot = local_info
.finalized_epoch
@@ -411,9 +408,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
};
// Retain only head peers that remain relevant
awaiting_head_peers.retain(|_peer_id, peer_sync_info| {
!is_outdated(&peer_sync_info.head_slot, &peer_sync_info.head_root)
});
awaiting_head_peers
.retain(|_peer_id, (target_root, target_slot)| !is_outdated(target_slot, target_root));
// Remove chains that are out-dated
let mut removed_chains = Vec::new();

View File

@@ -43,25 +43,27 @@ use super::chain::{ChainId, RemoveChain, SyncingChain};
use super::chain_collection::{ChainCollection, SyncChainStatus};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::status::ToStatusMessage;
use crate::sync::BatchProcessResult;
use crate::sync::batch::BatchId;
use crate::sync::network_context::{RpcResponseError, SyncNetworkContext};
use crate::sync::peer_sync_info::{LocalSyncInfo, PeerSyncTypeAdvanced};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::PeerId;
use lighthouse_network::rpc::GoodbyeReason;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerId, SyncInfo};
use logging::crit;
use lru_cache::LRUTimeCache;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, trace, warn};
use types::{Epoch, EthSpec, Hash256};
use types::{Epoch, EthSpec, Hash256, Slot};
/// For how long we store failed finalized chains to prevent retries.
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
pub(crate) type AwaitingHeadPeers = HashMap<PeerId, (Hash256, Slot)>;
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
/// holds the current state of the long range sync.
@@ -70,7 +72,7 @@ pub struct RangeSync<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
awaiting_head_peers: AwaitingHeadPeers,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T>,
@@ -110,29 +112,28 @@ where
pub fn add_peer(
&mut self,
network: &mut SyncNetworkContext<T>,
local_info: SyncInfo,
local_info: LocalSyncInfo,
peer_id: PeerId,
remote_info: SyncInfo,
advanced_type: PeerSyncTypeAdvanced,
) {
// evaluate which chain to sync from
// determine if we need to run a sync to the nearest finalized state or simply sync to
// its current head
// convenience variable
let remote_finalized_slot = remote_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
// NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. This
// is OK since we since only one finalized chain at a time.
// determine which kind of sync to perform and set up the chains
match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) {
RangeSyncType::Finalized => {
match advanced_type {
PeerSyncTypeAdvanced::Finalized {
target_root,
target_slot,
start_epoch,
} => {
// Make sure we have not recently tried this chain
if self.failed_chains.contains(&remote_info.finalized_root) {
debug!(failed_root = ?remote_info.finalized_root, %peer_id,"Disconnecting peer that belongs to previously failed chain");
if self.failed_chains.contains(&target_root) {
debug!(failed_root = ?target_root, %peer_id,"Disconnecting peer that belongs to previously failed chain");
network.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
return;
}
@@ -145,15 +146,14 @@ where
// to using exact epoch boundaries for batches (rather than one slot past the epoch
// boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's
// finalized slot in order to finalize the chain locally.
let target_head_slot =
remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1;
let target_head_slot = target_slot + (2 * T::EthSpec::slots_per_epoch()) + 1;
// Note: We keep current head chains. These can continue syncing whilst we complete
// this new finalized chain.
self.chains.add_peer_or_create_chain(
local_info.finalized_epoch,
remote_info.finalized_root,
start_epoch,
target_root,
target_head_slot,
peer_id,
RangeSyncType::Finalized,
@@ -163,14 +163,19 @@ where
self.chains
.update(network, &local_info, &mut self.awaiting_head_peers);
}
RangeSyncType::Head => {
PeerSyncTypeAdvanced::Head {
target_root,
target_slot,
start_epoch,
} => {
// This peer requires a head chain sync
if self.chains.is_finalizing_sync() {
// If there are finalized chains to sync, finish these first, before syncing head
// chains.
trace!(%peer_id, awaiting_head_peers = &self.awaiting_head_peers.len(),"Waiting for finalized sync to complete");
self.awaiting_head_peers.insert(peer_id, remote_info);
self.awaiting_head_peers
.insert(peer_id, (target_root, target_slot));
return;
}
@@ -181,12 +186,10 @@ where
// The new peer has the same finalized (earlier filters should prevent a peer with an
// earlier finalized chain from reaching here).
let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot)
.epoch(T::EthSpec::slots_per_epoch());
self.chains.add_peer_or_create_chain(
start_epoch,
remote_info.head_root,
remote_info.head_slot,
target_root,
target_slot,
peer_id,
RangeSyncType::Head,
network,
@@ -357,16 +360,8 @@ where
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
let status = self.beacon_chain.status_message();
let local = SyncInfo {
head_slot: *status.head_slot(),
head_root: *status.head_root(),
finalized_epoch: *status.finalized_epoch(),
finalized_root: *status.finalized_root(),
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
};
// update the state of the collection
let local = LocalSyncInfo::new(&self.beacon_chain);
self.chains
.update(network, &local, &mut self.awaiting_head_peers);
}