Make range sync peer loadbalancing PeerDAS-friendly (#6922)

- Re-opens https://github.com/sigp/lighthouse/pull/6864 targeting unstable

Range sync and backfill sync still assume that each batch request is done by a single peer. This assumption breaks with PeerDAS, where we request custody columns to N peers.

Issues with current unstable:

- Peer prioritization counts batch requests per peer. This accounting is broken now, data columns by range request are not accounted
- Peer selection for data columns by range ignores the set of peers on a syncing chain, instead draws from the global pool of peers
- The implementation is very strict when we have no peers to request from. After PeerDAS this case is very common and we want to be flexible or easy and handle that case better than just hard failing everything.


  - [x] Upstream peer prioritization to the network context, it knows exactly how many active requests a peer (including columns by range)
- [x] Upstream peer selection to the network context, now `block_components_by_range_request` gets a set of peers to choose from instead of a single peer. If it can't find a peer, it returns the error `RpcRequestSendError::NoPeer`
- [ ] Range sync and backfill sync handle `RpcRequestSendError::NoPeer` explicitly
- [ ] Range sync: leaves the batch in `AwaitingDownload` state and does nothing. **TODO**: we should have some mechanism to fail the chain if it's stale for too long - **EDIT**: Not done in this PR
- [x] Backfill sync: pauses the sync until another peer joins - **EDIT**: Same logic as unstable

### TODOs

- [ ] Add tests :)
- [x] Manually test backfill sync

Note: this touches the mainnet path!
This commit is contained in:
Lion - dapplion
2025-05-06 23:03:07 -03:00
committed by GitHub
parent 43c38a6fa0
commit beb0ce68bd
12 changed files with 541 additions and 472 deletions

View File

@@ -2,16 +2,13 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::{RangeRequestId, RpcResponseError};
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use rand::seq::SliceRandom;
use rand::Rng;
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use strum::IntoStaticStr;
use tracing::{debug, instrument, warn};
@@ -91,7 +88,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
/// and thus available to download this chain from, as well as the batches we are currently
/// requesting.
peers: FnvHashMap<PeerId, HashSet<BatchId>>,
peers: HashSet<PeerId>,
/// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: BatchId,
@@ -133,9 +130,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer_id: PeerId,
chain_type: SyncingChainType,
) -> Self {
let mut peers = FnvHashMap::default();
peers.insert(peer_id, Default::default());
SyncingChain {
id,
chain_type,
@@ -143,7 +137,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot,
target_head_root,
batches: BTreeMap::new(),
peers,
peers: HashSet::from_iter([peer_id]),
to_be_downloaded: start_epoch,
processing_target: start_epoch,
optimistic_start: None,
@@ -173,7 +167,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Peers currently syncing this chain.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn peers(&self) -> impl Iterator<Item = PeerId> + '_ {
self.peers.keys().cloned()
self.peers.iter().cloned()
}
/// Progress in epochs made by the chain
@@ -196,29 +190,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches.
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(%peer_id, batch = ?id, "Batch not found while removing peer")
}
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);
if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)
@@ -270,11 +243,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.peers
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));
let received = batch.download_completed(blocks)?;
// TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258
let received = batch.download_completed(blocks, *peer_id)?;
let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
/ EPOCHS_PER_BATCH;
@@ -476,7 +447,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
};
let peer = batch.current_peer().cloned().ok_or_else(|| {
let peer = batch.processing_peer().cloned().ok_or_else(|| {
RemoveChain::WrongBatchState(format!(
"Processing target is in wrong state: {:?}",
batch.state(),
@@ -582,7 +553,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"Batch failed to download. Dropping chain scoring peers"
);
for (peer, _) in self.peers.drain() {
for peer in self.peers.drain() {
network.report_peer(peer, *penalty, "faulty_chain");
}
Err(RemoveChain::ChainFailed {
@@ -595,7 +566,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
BatchProcessResult::NonFaultyFailure => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;
// Simply redownload the batch.
self.retry_batch_download(network, batch_id)
self.send_batch(network, batch_id)
}
}
}
@@ -616,7 +587,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(%epoch, reason, "Rejected optimistic batch left for future use");
// this batch is now treated as any other batch, and re-requested for future use
if redownload {
return self.retry_batch_download(network, epoch);
return self.send_batch(network, epoch);
}
} else {
debug!(%epoch, reason, "Rejected optimistic batch");
@@ -696,12 +667,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
}
BatchState::Downloading(peer, ..) => {
// remove this batch from the peer's active requests
if let Some(active_batches) = self.peers.get_mut(peer) {
active_batches.remove(&id);
}
}
BatchState::Downloading(..) => {}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
crit!("batch indicates inconsistent chain state while advancing chain")
}
@@ -790,10 +756,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.processing_target = self.start_epoch;
for id in redownload_queue {
self.retry_batch_download(network, id)?;
self.send_batch(network, id)?;
}
// finally, re-request the failed batch.
self.retry_batch_download(network, batch_id)
self.send_batch(network, batch_id)
}
pub fn stop_syncing(&mut self) {
@@ -849,13 +815,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
network: &mut SyncNetworkContext<T>,
peer_id: PeerId,
) -> ProcessingResult {
// add the peer without overwriting its active requests
if self.peers.entry(peer_id).or_default().is_empty() {
// Either new or not, this peer is idle, try to request more batches
self.request_batches(network)
} else {
Ok(KeepChain)
}
self.peers.insert(peer_id);
self.request_batches(network)
}
/// An RPC error has occurred.
@@ -896,16 +857,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
%request_id,
"Batch download error"
);
if let Some(active_requests) = self.peers.get_mut(peer_id) {
active_requests.remove(&batch_id);
}
if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(Some(*peer_id))?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
});
}
self.retry_batch_download(network, batch_id)
self.send_batch(network, batch_id)
} else {
debug!(
batch_epoch = %batch_id,
@@ -919,66 +879,42 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
/// Sends and registers the request of a batch awaiting download.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn retry_batch_download(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> ProcessingResult {
let Some(batch) = self.batches.get_mut(&batch_id) else {
return Ok(KeepChain);
};
// Find a peer to request the batch
let failed_peers = batch.failed_peers();
let new_peer = self
.peers
.iter()
.map(|(peer, requests)| {
(
failed_peers.contains(peer),
requests.len(),
rand::thread_rng().gen::<u32>(),
*peer,
)
})
// Sort peers prioritizing unrelated peers with less active requests.
.min()
.map(|(_, _, _, peer)| peer);
if let Some(peer) = new_peer {
self.send_batch(network, batch_id, peer)
} else {
// If we are here the chain has no more peers
Err(RemoveChain::EmptyPeerPool)
}
}
/// Requests the batch assigned to the given id from a given peer.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn send_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer: PeerId,
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_peers();
// TODO(das): we should request only from peers that are part of this SyncingChain.
// However, then we hit the NoPeer error frequently which causes the batch to fail and
// the SyncingChain to be dropped. We need to handle this case more gracefully.
let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();
match network.block_components_by_range_request(
peer,
batch_type,
request,
RangeRequestId::RangeSync {
chain_id: self.id,
batch_id,
},
&synced_peers,
&failed_peers,
) {
Ok(request_id) => {
// inform the batch about the new request
batch.start_downloading_from_peer(peer, request_id)?;
batch.start_downloading(request_id)?;
if self
.optimistic_start
.map(|epoch| epoch == batch_id)
@@ -988,41 +924,34 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
} else {
debug!(epoch = %batch_id, %batch, %batch_state, "Requesting batch");
}
// register the batch for this peer
return self
.peers
.get_mut(&peer)
.map(|requests| {
requests.insert(batch_id);
Ok(KeepChain)
})
.unwrap_or_else(|| {
Err(RemoveChain::WrongChainState(format!(
"Sending batch to a peer that is not in the chain: {}",
peer
)))
});
return Ok(KeepChain);
}
Err(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = %e, %batch, "Could not send batch request");
// register the failed download and check if the batch can be retried
batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant
self.peers
.get_mut(&peer)
.map(|request| request.remove(&batch_id));
match batch.download_failed(true)? {
BatchOperationOutcome::Failed { blacklist } => {
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
})
}
BatchOperationOutcome::Continue => {
return self.retry_batch_download(network, batch_id)
Err(e) => match e {
// TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For
// sync to work properly it must be okay to have "stalled" batches in
// AwaitingDownload state. Currently it will error with invalid state if
// that happens. Sync manager must periodicatlly prune stalled batches like
// we do for lookup sync. Then we can deprecate the redundant
// `good_peers_on_sampling_subnets` checks.
e
@ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request");
// register the failed download and check if the batch can be retried
batch.start_downloading(1)?; // fake request_id = 1 is not relevant
match batch.download_failed(None)? {
BatchOperationOutcome::Failed { blacklist } => {
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
})
}
BatchOperationOutcome::Continue => {
return self.send_batch(network, batch_id)
}
}
}
}
},
}
}
@@ -1061,21 +990,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// find the next pending batch and request it from the peer
// randomize the peers for load balancing
let mut rng = rand::thread_rng();
let mut idle_peers = self
.peers
.iter()
.filter_map(|(peer, requests)| {
if requests.is_empty() {
Some(*peer)
} else {
None
}
})
.collect::<Vec<_>>();
idle_peers.shuffle(&mut rng);
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
@@ -1085,26 +999,25 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
entry.insert(optimistic_batch);
self.send_batch(network, epoch, peer)?;
}
let batch_type = network.batch_type(epoch);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
entry.insert(optimistic_batch);
self.send_batch(network, epoch)?;
}
return Ok(KeepChain);
}
while let Some(peer) = idle_peers.pop() {
if let Some(batch_id) = self.include_next_batch(network) {
// send the batch
self.send_batch(network, batch_id, peer)?;
} else {
// No more batches, simply stop
return Ok(KeepChain);
}
// find the next pending batch and request it from the peer
// Note: for this function to not infinite loop we must:
// - If `include_next_batch` returns Some we MUST increase the count of batches that are
// accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of
// that function.
while let Some(batch_id) = self.include_next_batch(network) {
// send the batch
self.send_batch(network, batch_id)?;
}
// No more batches, simply stop
Ok(KeepChain)
}
@@ -1149,6 +1062,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
{
return None;
}
// only request batches up to the buffer size limit
// NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync
// if the current processing window is contained in a long range of skip slots.
@@ -1177,19 +1091,20 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
let batch_id = self.to_be_downloaded;
// If no batch needs a retry, attempt to send the batch of the next epoch to download
let next_batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
match self.batches.entry(next_batch_id) {
Entry::Occupied(_) => {
// this batch doesn't need downloading, let this same function decide the next batch
self.to_be_downloaded += EPOCHS_PER_BATCH;
self.include_next_batch(network)
}
Entry::Vacant(entry) => {
let batch_type = network.batch_type(batch_id);
entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type));
let batch_type = network.batch_type(next_batch_id);
entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type));
self.to_be_downloaded += EPOCHS_PER_BATCH;
Some(batch_id)
Some(next_batch_id)
}
}
}