Resolve more TODOs

This commit is contained in:
dapplion
2025-05-27 15:28:03 -05:00
parent 52722b7b2e
commit fc3922f854
6 changed files with 66 additions and 112 deletions

View File

@@ -21,8 +21,9 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use itertools::Itertools;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::PeerAction;
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use parking_lot::RwLock;
use std::collections::{
btree_map::{BTreeMap, Entry},
HashMap, HashSet,
@@ -135,6 +136,8 @@ pub struct BackFillSync<T: BeaconChainTypes> {
/// This signifies that we are able to attempt to restart a failed chain.
restart_failed_sync: bool,
peers: Arc<RwLock<HashSet<PeerId>>>,
/// Reference to the beacon chain to obtain initial starting points for the backfill sync.
beacon_chain: Arc<BeaconChain<T>>,
@@ -179,6 +182,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
current_processing_batch: None,
validated_batches: 0,
restart_failed_sync: false,
peers: <_>::default(),
beacon_chain,
};
@@ -218,14 +222,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
match self.state() {
BackFillState::Syncing => {} // already syncing ignore.
BackFillState::Paused => {
if self
.network_globals
.peers
.read()
.synced_peers()
.next()
.is_some()
{
if !self.peers.read().is_empty() {
// If there are peers to resume with, begin the resume.
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
self.set_state(BackFillState::Syncing);
@@ -298,6 +295,14 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.write().insert(peer_id);
}
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.peers.write().remove(peer_id);
}
/// An RPC error has occurred.
///
/// If the batch exists it is re-requested.
@@ -920,20 +925,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let synced_peers = self
.network_globals
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();
let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers();
match network.block_components_by_range_request(
request,
RangeRequestId::BackfillSync { batch_id },
&synced_peers,
self.peers.clone(),
&failed_peers,
// Does not track total requests per peers for now
&HashMap::new(),

View File

@@ -413,6 +413,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
PeerSyncType::Advanced => {
self.range_sync
.add_peer(&mut self.network, local, peer_id, remote);
self.backfill_sync.add_peer(peer_id);
}
PeerSyncType::FullySynced => {
// Sync considers this peer close enough to the head to not trigger range sync.
@@ -530,6 +531,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// Remove peer from all data structures
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.backfill_sync.peer_disconnected(peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.

View File

@@ -478,7 +478,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request: BlocksByRangeRequest,
requester: RangeRequestId,
peers: &HashSet<PeerId>,
peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>,
total_requests_per_peer: &HashMap<PeerId, usize>,
) -> Result<Id, RpcRequestSendError> {
@@ -498,7 +498,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.block_components_by_range_requests.insert(id, req);
// TODO: use ID
Ok(id.id)
}

View File

@@ -91,7 +91,7 @@ impl From<Error> for RpcRequestSendError {
}
}
/// FOR TESTING ONLY
/// Used to typesafe assertions of state in range sync tests
#[cfg(test)]
#[derive(Debug)]
pub enum BlockComponentsByRangeRequestStep {
@@ -103,7 +103,7 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
pub fn new(
id: ComponentsByRangeRequestId,
request: BlocksByRangeRequest,
peers: &HashSet<PeerId>,
peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>,
total_requests_per_peer: &HashMap<PeerId, usize>,
cx: &mut SyncNetworkContext<T>,
@@ -123,6 +123,7 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
// will request all blocks for the first 5 epochs to that same single peer. Before we would
// query only idle peers in the syncing chain.
let Some(block_peer) = peers
.read()
.iter()
.map(|peer| {
(
@@ -180,9 +181,7 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
Ok(Self {
id,
// TODO(das): share the rwlock with the range sync batch. Are peers added to the batch
// after being created?
peers: Arc::new(RwLock::new(peers.clone())),
peers,
request,
state,
})
@@ -511,8 +510,6 @@ fn couple_blocks_fulu<E: EthSpec>(
.remove(&block_root)
.unwrap_or_default();
// TODO(das): Change RpcBlock to holding a Vec of DataColumnSidecars so we don't need
// the spec here.
RpcBlock::new_with_custody_columns(
Some(block_root),
block,

View File

@@ -1,5 +1,4 @@
use super::custody_by_root::{ColumnRequest, Error};
use crate::sync::network_context::RpcResponseError;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
@@ -22,7 +21,7 @@ use types::{
use super::{PeerGroup, RpcResponseResult, SyncNetworkContext};
const TEMPORARY_FAULT_EXPIRY_SECONDS: u64 = 15;
const FAILED_PEERS_EXPIRY_SECONDS: u64 = 15;
const REQUEST_EXPIRY_SECONDS: u64 = 300;
pub struct ActiveCustodyByRangeRequest<T: BeaconChainTypes> {
@@ -41,13 +40,7 @@ pub struct ActiveCustodyByRangeRequest<T: BeaconChainTypes> {
FnvHashMap<DataColumnsByRangeRequestId, ActiveBatchColumnsRequest>,
/// Peers that have recently failed to successfully respond to a columns by root request.
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
peers_with_custody_failures: LRUTimeCache<PeerId>,
peers_with_temporary_faults: LRUTimeCache<PeerId>,
// TODO(das): does this HashSet has an OOM risk? We should either: make sure that this request
// structs are dropped after some time, that disconnected peers are pruned (but we may want to
// retain faulty information if they just disconnect and reconnect) or make this an LRUTimeCache
// with a long time (like 5 minutes).
peers_with_permanent_faults: HashSet<PeerId>,
failed_peers: LRUTimeCache<PeerId>,
/// Set of peers that claim to have imported this block and their custody columns
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
@@ -89,13 +82,7 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
.map(|index| (*index, ColumnRequest::new())),
),
active_batch_columns_requests: <_>::default(),
peers_with_custody_failures: LRUTimeCache::new(Duration::from_secs(
TEMPORARY_FAULT_EXPIRY_SECONDS,
)),
peers_with_temporary_faults: LRUTimeCache::new(Duration::from_secs(
TEMPORARY_FAULT_EXPIRY_SECONDS,
)),
peers_with_permanent_faults: HashSet::new(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_EXPIRY_SECONDS)),
lookup_peers,
_phantom: PhantomData,
}
@@ -138,7 +125,7 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
}
// Accumulate columns that the peer does not have to issue a single log per request
let mut missing_column_indexes = vec![];
let mut missing_column_indices = vec![];
let mut incorrect_column_indices = vec![];
let mut imported_column_indices = vec![];
@@ -178,14 +165,8 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
// - peer custodies this column `index`
// - peer claims to be synced to at least `slot`
//
// Therefore not returning this column is an protocol violation that we
// penalize and mark the peer as failed to retry with another peer.
//
// TODO(das) do not consider this case a success. We know for sure the block has
// data. However we allow the peer to return empty as we can't attribute fault.
// TODO(das): Should track which columns are missing and eventually give up
// TODO(das): If the peer is in the lookup peer set it claims to have imported
// the block AND its custody columns. So in this case we can downscore
// Then we penalize the faulty peer, mark it as failed and try with
// another.
Err(ColumnResponseError::MissingColumn(slot))
}
})
@@ -219,15 +200,15 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
));
}
ColumnResponseError::MissingColumn(slot) => {
missing_column_indexes.push((index, slot));
missing_column_indices.push((index, slot));
}
}
}
}
}
// Log missing_column_indexes and incorrect_column_indices here in batch per request
// to make this logs more compact and less noisy.
// Log `imported_column_indices`, `missing_column_indexes` and
// `incorrect_column_indices` once per request to make the logs less noisy.
if !imported_column_indices.is_empty() {
// TODO(das): this log may be redundant. We already log on DataColumnsByRange
// completed, and on DataColumnsByRange sent we log the column indices
@@ -246,21 +227,18 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
}
if !incorrect_column_indices.is_empty() {
// Note: Batch logging that columns are missing to not spam logger
debug!(
id = %self.id,
data_columns_by_range_req_id = %req_id,
%peer_id,
// TODO(das): this property can become very noisy, being the full range 0..128
incorrect_columns = ?incorrect_column_indices,
?incorrect_column_indices,
"Custody by range peer returned non-matching columns"
);
// Returning a non-canonical column is not a permanent fault. We should not
// retry the peer for some time but the peer may return a canonical column in
// the future.
// TODO(das): if this finalized sync the fault is permanent
self.peers_with_temporary_faults.insert(peer_id);
self.failed_peers.insert(peer_id);
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
@@ -268,19 +246,17 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
);
}
if !missing_column_indexes.is_empty() {
// Note: Batch logging that columns are missing to not spam logger
if !missing_column_indices.is_empty() {
debug!(
id = %self.id,
data_columns_by_range_req_id = %req_id,
%peer_id,
// TODO(das): this property can become very noisy, being the full range 0..128
?missing_column_indexes,
?missing_column_indices,
"Custody by range peer claims to not have some data"
);
// Not having columns is not a permanent fault. The peer may be backfilling.
self.peers_with_custody_failures.insert(peer_id);
self.failed_peers.insert(peer_id);
cx.report_peer(peer_id, PeerAction::MidToleranceError, "custody_failure");
}
}
@@ -293,7 +269,6 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
"Custody by range download error"
);
// TODO(das): Should mark peer as failed and try from another peer
for column_index in &batch_request.indices {
self.column_requests
.get_mut(column_index)
@@ -301,22 +276,8 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
.on_download_error_and_mark_failure(req_id, err.clone())?;
}
match err {
// Verify errors are correctness errors against our request or about the
// returned data itself. This peer is faulty or malicious, should not be
// retried.
RpcResponseError::VerifyError(_) => {
self.peers_with_permanent_faults.insert(peer_id);
}
// Network errors are not permanent faults and worth retrying
RpcResponseError::RpcError(_) => {
self.peers_with_temporary_faults.insert(peer_id);
}
// Do nothing for internal errors
RpcResponseError::InternalError(_) => {}
// unreachable
RpcResponseError::RequestExpired(_) => {}
}
// An RpcResponseError is already downscored in network_context
self.failed_peers.insert(peer_id);
}
};
@@ -386,18 +347,13 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
let mut priorized_peers = custodial_peers
.iter()
.filter(|peer| {
// Never request again peers with permanent faults
// Do not request peers with custody failures for some time
!self.peers_with_permanent_faults.contains(peer)
&& !self.peers_with_custody_failures.contains(peer)
// Do not request faulty peers for some time
!self.failed_peers.contains(peer)
})
.map(|peer| {
(
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that have failed to successfully respond to
// requests recently, but allow to immediatelly request them again
self.peers_with_temporary_faults.contains(peer),
// Prefer peers with fewer requests to load balance across peers.
// We batch requests to the same peer, so count existence in the
// `columns_to_request_by_peer` as a single 1 request.
@@ -411,7 +367,7 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
.collect::<Vec<_>>();
priorized_peers.sort_unstable();
if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
if let Some((_, _, _, peer_id)) = priorized_peers.first() {
columns_to_request_by_peer
.entry(*peer_id)
.or_default()

View File

@@ -10,7 +10,9 @@ use itertools::Itertools;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use parking_lot::RwLock;
use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use strum::IntoStaticStr;
use tracing::{debug, instrument, warn};
use types::{Epoch, EthSpec, Hash256, Slot};
@@ -91,7 +93,11 @@ pub struct SyncingChain<T: BeaconChainTypes> {
///
/// Also, For each peer tracks the total requests done per peer as part of this SyncingChain
/// `HashMap<peer, total_requests_per_peer>`
peers: HashMap<PeerId, usize>,
peers: Arc<RwLock<HashSet<PeerId>>>,
/// Tracks the total requests done to each peer for this SyncingChain. Forces us to fetch data
/// from all peers to prevent eclipse attacks
requests_per_peer: HashMap<PeerId, usize>,
/// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: BatchId,
@@ -173,7 +179,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot,
target_head_root,
batches: BTreeMap::new(),
peers: HashMap::from_iter([(peer_id, <_>::default())]),
peers: Arc::new(RwLock::new(HashSet::from_iter([peer_id]))),
requests_per_peer: HashMap::from_iter([(peer_id, <_>::default())]),
to_be_downloaded: start_epoch,
processing_target: start_epoch,
optimistic_start: None,
@@ -191,7 +198,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Check if the chain has peers from which to process batches.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn available_peers(&self) -> usize {
self.peers.len()
self.peers.read().len()
}
/// Get the chain's id.
@@ -203,7 +210,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Peers currently syncing this chain.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn peers(&self) -> impl Iterator<Item = PeerId> + '_ {
self.peers.keys().cloned()
self.peers
.read()
.iter()
.copied()
.collect::<Vec<_>>()
.into_iter()
}
/// Progress in epochs made by the chain
@@ -227,9 +239,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// If the peer has active batches, those are considered failed and re-requested.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);
self.peers.write().remove(peer_id);
self.requests_per_peer.remove(peer_id);
if self.peers.is_empty() {
if self.peers.read().is_empty() {
Err(RemoveChain::EmptyPeerPool)
} else {
Ok(KeepChain)
@@ -259,7 +272,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Account for one more requests to this peer
// TODO(das): this code assumes that we do a single request per peer per RpcBlock
for peer in batch_peers.iter_unique_peers() {
*self.peers.entry(*peer).or_default() += 1;
*self.requests_per_peer.entry(*peer).or_default() += 1;
}
// check if we have this batch
@@ -613,7 +626,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"Batch failed to download. Dropping chain scoring peers"
);
for (peer, _) in self.peers.drain() {
for peer in self.peers.write().drain() {
network.report_peer(peer, penalty, "faulty_chain");
}
Err(RemoveChain::ChainFailed {
@@ -878,7 +891,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
network: &mut SyncNetworkContext<T>,
peer_id: PeerId,
) -> ProcessingResult {
self.peers.insert(peer_id, <_>::default());
self.peers.write().insert(peer_id);
self.requests_per_peer.insert(peer_id, <_>::default());
self.request_batches(network)
}
@@ -952,26 +966,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers();
// TODO(das): we should request only from peers that are part of this SyncingChain.
// However, then we hit the NoPeer error frequently which causes the batch to fail and
// the SyncingChain to be dropped. We need to handle this case more gracefully.
let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();
match network.block_components_by_range_request(
request,
RangeRequestId::RangeSync {
chain_id: self.id,
batch_id,
},
&synced_peers,
self.peers.clone(),
&failed_peers,
&self.peers,
&self.requests_per_peer,
) {
Ok(request_id) => {
// inform the batch about the new request