From beb0ce68bdf62a417a0dba8f11e907bf1c2d6f17 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 6 May 2025 23:03:07 -0300 Subject: [PATCH] Make range sync peer loadbalancing PeerDAS-friendly (#6922) - Re-opens https://github.com/sigp/lighthouse/pull/6864 targeting unstable Range sync and backfill sync still assume that each batch request is done by a single peer. This assumption breaks with PeerDAS, where we request custody columns to N peers. Issues with current unstable: - Peer prioritization counts batch requests per peer. This accounting is broken now, data columns by range request are not accounted - Peer selection for data columns by range ignores the set of peers on a syncing chain, instead draws from the global pool of peers - The implementation is very strict when we have no peers to request from. After PeerDAS this case is very common and we want to be flexible or easy and handle that case better than just hard failing everything. - [x] Upstream peer prioritization to the network context, it knows exactly how many active requests a peer (including columns by range) - [x] Upstream peer selection to the network context, now `block_components_by_range_request` gets a set of peers to choose from instead of a single peer. If it can't find a peer, it returns the error `RpcRequestSendError::NoPeer` - [ ] Range sync and backfill sync handle `RpcRequestSendError::NoPeer` explicitly - [ ] Range sync: leaves the batch in `AwaitingDownload` state and does nothing. **TODO**: we should have some mechanism to fail the chain if it's stale for too long - **EDIT**: Not done in this PR - [x] Backfill sync: pauses the sync until another peer joins - **EDIT**: Same logic as unstable ### TODOs - [ ] Add tests :) - [x] Manually test backfill sync Note: this touches the mainnet path! --- .../src/peer_manager/peerdb.rs | 19 +- .../lighthouse_network/src/types/globals.rs | 14 + .../src/network_beacon_processor/mod.rs | 2 +- .../network/src/sync/backfill_sync/mod.rs | 303 +++++++--------- beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/network_context.rs | 331 +++++++++++++----- .../src/sync/network_context/custody.rs | 32 +- .../src/sync/network_context/requests.rs | 4 + .../network/src/sync/range_sync/batch.rs | 43 +-- .../network/src/sync/range_sync/chain.rs | 249 +++++-------- .../network/src/sync/range_sync/range.rs | 5 +- beacon_node/network/src/sync/tests/lookups.rs | 7 +- 12 files changed, 541 insertions(+), 472 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 083887046a..95a4e82fa2 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,6 +1,8 @@ use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; use crate::discovery::{peer_id_to_node_id, CombinedKey}; -use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId}; +use crate::{ + metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo, +}; use itertools::Itertools; use logging::crit; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; @@ -15,7 +17,7 @@ use std::{ use sync_status::SyncStatus; use tracing::{debug, error, trace, warn}; use types::data_column_custody_group::compute_subnets_for_node; -use types::{ChainSpec, DataColumnSubnetId, EthSpec}; +use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot}; pub mod client; pub mod peer_info; @@ -735,6 +737,19 @@ impl PeerDB { }, ); + self.update_sync_status( + &peer_id, + SyncStatus::Synced { + // Fill in mock SyncInfo, only for the peer to return `is_synced() == true`. + info: SyncInfo { + head_slot: Slot::new(0), + head_root: Hash256::ZERO, + finalized_epoch: Epoch::new(0), + finalized_root: Hash256::ZERO, + }, + }, + ); + if supernode { let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); let all_subnets = (0..spec.data_column_sidecar_subnet_count) diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 3031a0dff7..fd99d93589 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -206,6 +206,20 @@ impl NetworkGlobals { .collect::>() } + /// Returns true if the peer is known and is a custodian of `column_index` + pub fn is_custody_peer_of(&self, column_index: ColumnIndex, peer_id: &PeerId) -> bool { + self.peers + .read() + .peer_info(peer_id) + .map(|info| { + info.is_assigned_to_custody_subnet(&DataColumnSubnetId::from_column_index( + column_index, + &self.spec, + )) + }) + .unwrap_or(false) + } + /// Returns the TopicConfig to compute the set of Gossip topics for a given fork pub fn as_topic_config(&self) -> TopicConfig { TopicConfig { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9a8edbfa4c..cfd5c24f99 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1141,7 +1141,7 @@ use { }; #[cfg(test)] -type TestBeaconChainType = +pub(crate) type TestBeaconChainType = Witness, E, MemoryStore, MemoryStore>; #[cfg(test)] diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 509caf7316..fcef06271f 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -10,7 +10,9 @@ use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::manager::BatchProcessResult; -use crate::sync::network_context::{RangeRequestId, RpcResponseError, SyncNetworkContext}; +use crate::sync::network_context::{ + RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, +}; use crate::sync::range_sync::{ BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, }; @@ -20,10 +22,9 @@ use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; -use rand::seq::SliceRandom; use std::collections::{ btree_map::{BTreeMap, Entry}, - HashMap, HashSet, + HashSet, }; use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; @@ -121,9 +122,6 @@ pub struct BackFillSync { /// Sorted map of batches undergoing some kind of processing. batches: BTreeMap>, - /// List of peers we are currently awaiting a response for. - active_requests: HashMap>, - /// The current processing batch, if any. current_processing_batch: Option, @@ -176,7 +174,6 @@ impl BackFillSync { let bfs = BackFillSync { batches: BTreeMap::new(), - active_requests: HashMap::new(), processing_target: current_start, current_start, last_batch_downloaded: false, @@ -314,45 +311,11 @@ impl BackFillSync { skip_all )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!(self.state(), BackFillState::Failed) { return Ok(()); } - if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches. - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - match batch.download_failed(false) { - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(id))?; - } - Ok(BatchOperationOutcome::Continue) => {} - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; - } - } - // If we have run out of peers in which to retry this batch, the backfill state - // transitions to a paused state. - // We still need to reset the state for all the affected batches, so we should not - // short circuit early. - if self.retry_batch_download(network, id).is_err() { - debug!( - batch_id = %id, - error = "no synced peers", - "Batch could not be retried" - ); - } - } else { - debug!(peer = %peer_id, batch = %id, "Batch not found while removing peer"); - } - } - } - // Remove the peer from the participation list self.participating_peers.remove(peer_id); Ok(()) @@ -386,15 +349,12 @@ impl BackFillSync { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); - if let Some(active_requests) = self.active_requests.get_mut(peer_id) { - active_requests.remove(&batch_id); - } - match batch.download_failed(true) { + match batch.download_failed(Some(*peer_id)) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) } - Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id), } } else { // this could be an error for an old batch, removed when the chain advances @@ -435,19 +395,11 @@ impl BackFillSync { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - // TODO(das): removed peer_id matching as the node may request a different peer for data - // columns. if !batch.is_expecting_block(&request_id) { return Ok(ProcessResult::Successful); } - // A stream termination has been sent. This batch has ended. Process a completed batch. - // Remove the request from the peer's active batches - self.active_requests - .get_mut(peer_id) - .map(|active_requests| active_requests.remove(&batch_id)); - - match batch.download_completed(blocks) { + match batch.download_completed(blocks, *peer_id) { Ok(received) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; @@ -488,7 +440,6 @@ impl BackFillSync { self.set_state(BackFillState::Failed); // Remove all batches and active requests and participating peers. self.batches.clear(); - self.active_requests.clear(); self.participating_peers.clear(); self.restart_failed_sync = false; @@ -622,7 +573,7 @@ impl BackFillSync { } }; - let Some(peer) = batch.current_peer() else { + let Some(peer) = batch.processing_peer() else { self.fail_sync(BackFillError::BatchInvalidState( batch_id, String::from("Peer does not exist"), @@ -698,6 +649,8 @@ impl BackFillSync { ); for peer in self.participating_peers.drain() { + // TODO(das): `participating_peers` only includes block peers. Should we + // penalize the custody column peers too? network.report_peer(peer, *penalty, "backfill_batch_failed"); } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) @@ -723,7 +676,7 @@ impl BackFillSync { { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } - self.retry_batch_download(network, batch_id)?; + self.send_batch(network, batch_id)?; Ok(ProcessResult::Successful) } } @@ -864,12 +817,7 @@ impl BackFillSync { } } } - BatchState::Downloading(peer, ..) => { - // remove this batch from the peer's active requests - if let Some(active_requests) = self.active_requests.get_mut(peer) { - active_requests.remove(&id); - } - } + BatchState::Downloading(..) => {} BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { crit!("batch indicates inconsistent chain state while advancing chain") } @@ -951,57 +899,10 @@ impl BackFillSync { self.processing_target = self.current_start; for id in redownload_queue { - self.retry_batch_download(network, id)?; + self.send_batch(network, id)?; } // finally, re-request the failed batch. - self.retry_batch_download(network, batch_id) - } - - /// Sends and registers the request of a batch awaiting download. - #[instrument(parent = None, - level = "info", - fields(service = "backfill_sync"), - name = "backfill_sync", - skip_all - )] - fn retry_batch_download( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - ) -> Result<(), BackFillError> { - let Some(batch) = self.batches.get_mut(&batch_id) else { - return Ok(()); - }; - - // Find a peer to request the batch - let failed_peers = batch.failed_peers(); - - let new_peer = self - .network_globals - .peers - .read() - .synced_peers() - .map(|peer| { - ( - failed_peers.contains(peer), - self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0), - rand::random::(), - *peer, - ) - }) - // Sort peers prioritizing unrelated peers with less active requests. - .min() - .map(|(_, _, _, peer)| peer); - - if let Some(peer) = new_peer { - self.participating_peers.insert(peer); - self.send_batch(network, batch_id, peer) - } else { - // If we are here the chain has no more synced peers - info!(reason = "insufficient_synced_peers", "Backfill sync paused"); - self.set_state(BackFillState::Paused); - Err(BackFillError::Paused) - } + self.send_batch(network, batch_id) } /// Requests the batch assigned to the given id from a given peer. @@ -1015,53 +916,65 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer: PeerId, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + let (request, is_blob_batch) = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( - peer, is_blob_batch, request, RangeRequestId::BackfillSync { batch_id }, + &synced_peers, + &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request - if let Err(e) = batch.start_downloading_from_peer(peer, request_id) { + if let Err(e) = batch.start_downloading(request_id) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); } debug!(epoch = %batch_id, %batch, "Requesting batch"); - // register the batch for this peer - self.active_requests - .entry(peer) - .or_default() - .insert(batch_id); return Ok(()); } - Err(e) => { - // NOTE: under normal conditions this shouldn't happen but we handle it anyway - warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); - // register the failed download and check if the batch can be retried - if let Err(e) = batch.start_downloading_from_peer(peer, 1) { - return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + Err(e) => match e { + RpcRequestSendError::NoPeer(no_peer) => { + // If we are here the chain has no more synced peers + info!( + "reason" = format!("insufficient_synced_peers({no_peer:?})"), + "Backfill sync paused" + ); + self.set_state(BackFillState::Paused); + return Err(BackFillError::Paused); } - self.active_requests - .get_mut(&peer) - .map(|request| request.remove(&batch_id)); + RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } - match batch.download_failed(true) { - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? - } - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? - } - Ok(BatchOperationOutcome::Continue) => { - return self.retry_batch_download(network, batch_id) + match batch.download_failed(None) { + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id) + } } } - } + }, } } @@ -1093,7 +1006,7 @@ impl BackFillSync { .collect::>(); for batch_id in batch_ids_to_retry { - self.retry_batch_download(network, batch_id)?; + self.send_batch(network, batch_id)?; } Ok(()) } @@ -1115,34 +1028,16 @@ impl BackFillSync { } // find the next pending batch and request it from the peer - - // randomize the peers for load balancing - let mut rng = rand::thread_rng(); - let mut idle_peers = self - .network_globals - .peers - .read() - .synced_peers() - .filter(|peer_id| { - self.active_requests - .get(peer_id) - .map(|requests| requests.is_empty()) - .unwrap_or(true) - }) - .cloned() - .collect::>(); - - idle_peers.shuffle(&mut rng); - - while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch(network) { - // send the batch - self.send_batch(network, batch_id, peer)?; - } else { - // No more batches, simply stop - return Ok(()); - } + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; } + + // No more batches, simply stop Ok(()) } @@ -1296,3 +1191,73 @@ enum ResetEpochError { /// The chain has already completed. SyncCompleted, } + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::test_utils::BeaconChainHarness; + use bls::Hash256; + use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus}; + use rand::prelude::StdRng; + use rand::SeedableRng; + use types::MinimalEthSpec; + + #[test] + fn request_batches_should_not_loop_infinitely() { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(4) + .fresh_ephemeral_store() + .build(); + + let beacon_chain = harness.chain.clone(); + let slots_per_epoch = MinimalEthSpec::slots_per_epoch(); + + let network_globals = Arc::new(NetworkGlobals::new_test_globals( + vec![], + Arc::new(NetworkConfig::default()), + beacon_chain.spec.clone(), + )); + + { + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let peer_id = network_globals + .peers + .write() + .__add_connected_peer_testing_only( + true, + &beacon_chain.spec, + k256::ecdsa::SigningKey::random(&mut rng).into(), + ); + + // Simulate finalized epoch and head being 2 epochs ahead + let finalized_epoch = Epoch::new(40); + let head_epoch = finalized_epoch + 2; + let head_slot = head_epoch.start_slot(slots_per_epoch) + 1; + + network_globals.peers.write().update_sync_status( + &peer_id, + SyncStatus::Synced { + info: SyncInfo { + head_slot, + head_root: Hash256::random(), + finalized_epoch, + finalized_root: Hash256::random(), + }, + }, + ); + } + + let mut network = SyncNetworkContext::new_for_testing( + beacon_chain.clone(), + network_globals.clone(), + harness.runtime.task_executor.clone(), + ); + + let mut backfill = BackFillSync::new(beacon_chain, network_globals); + backfill.set_state(BackFillState::Syncing); + + // if this ends up running into an infinite loop, the test will overflow the stack pretty quickly. + let _ = backfill.request_batches(&mut network); + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 84e492c04f..9119b1652c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -515,9 +515,7 @@ impl SyncManager { // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); - let _ = self - .backfill_sync - .peer_disconnected(peer_id, &mut self.network); + let _ = self.backfill_sync.peer_disconnected(peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2cb5ec9a0a..d9eda651e7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -9,6 +9,8 @@ use super::range_sync::ByRangeRequestType; use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; +#[cfg(test)] +use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; @@ -27,18 +29,20 @@ use lighthouse_network::service::api_types::{ }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; -use rand::prelude::IteratorRandom; -use rand::thread_rng; pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, }; +#[cfg(test)] +use slot_clock::SlotClock; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; +#[cfg(test)] +use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{debug, error, span, warn, Level}; use types::blob_sidecar::FixedBlobSidecarList; @@ -82,24 +86,18 @@ pub enum RpcResponseError { #[derive(Debug, PartialEq, Eq)] pub enum RpcRequestSendError { - /// Network channel send failed - NetworkSendError, - NoCustodyPeers, - CustodyRequestError(custody::Error), - SlotClockError, + /// No peer available matching the required criteria + NoPeer(NoPeerError), + /// These errors should never happen, including unreachable custody errors or network send + /// errors. + InternalError(String), } -impl std::fmt::Display for RpcRequestSendError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - RpcRequestSendError::NetworkSendError => write!(f, "Network send error"), - RpcRequestSendError::NoCustodyPeers => write!(f, "No custody peers"), - RpcRequestSendError::CustodyRequestError(e) => { - write!(f, "Custody request error: {:?}", e) - } - RpcRequestSendError::SlotClockError => write!(f, "Slot clock error"), - } - } +/// Type of peer missing that caused a `RpcRequestSendError::NoPeers` +#[derive(Debug, PartialEq, Eq)] +pub enum NoPeerError { + BlockPeer, + CustodyPeer(ColumnIndex), } #[derive(Debug, PartialEq, Eq)] @@ -232,6 +230,35 @@ pub enum RangeBlockComponent { ), } +#[cfg(test)] +impl SyncNetworkContext> { + pub fn new_for_testing( + beacon_chain: Arc>>, + network_globals: Arc>, + task_executor: TaskExecutor, + ) -> Self { + let fork_context = Arc::new(ForkContext::new::( + beacon_chain.slot_clock.now().unwrap_or(Slot::new(0)), + beacon_chain.genesis_validators_root, + &beacon_chain.spec, + )); + let (network_tx, _network_rx) = mpsc::unbounded_channel(); + let (beacon_processor, _) = NetworkBeaconProcessor::null_for_testing( + network_globals, + mpsc::unbounded_channel().0, + beacon_chain.clone(), + task_executor, + ); + + SyncNetworkContext::new( + network_tx, + Arc::new(beacon_processor), + beacon_chain, + fork_context, + ) + } +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -331,12 +358,6 @@ impl SyncNetworkContext { .custody_peers_for_column(column_index) } - pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option { - self.get_custodial_peers(column_index) - .into_iter() - .choose(&mut thread_rng()) - } - pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } @@ -381,34 +402,102 @@ impl SyncNetworkContext { } } + fn active_request_count_by_peer(&self) -> HashMap { + let Self { + network_send: _, + request_id: _, + blocks_by_root_requests, + blobs_by_root_requests, + data_columns_by_root_requests, + blocks_by_range_requests, + blobs_by_range_requests, + data_columns_by_range_requests, + // custody_by_root_requests is a meta request of data_columns_by_root_requests + custody_by_root_requests: _, + // components_by_range_requests is a meta request of various _by_range requests + components_by_range_requests: _, + execution_engine_state: _, + network_beacon_processor: _, + chain: _, + fork_context: _, + // Don't use a fallback match. We want to be sure that all requests are considered when + // adding new ones + } = self; + + let mut active_request_count_by_peer = HashMap::::new(); + + for peer_id in blocks_by_root_requests + .iter_request_peers() + .chain(blobs_by_root_requests.iter_request_peers()) + .chain(data_columns_by_root_requests.iter_request_peers()) + .chain(blocks_by_range_requests.iter_request_peers()) + .chain(blobs_by_range_requests.iter_request_peers()) + .chain(data_columns_by_range_requests.iter_request_peers()) + { + *active_request_count_by_peer.entry(peer_id).or_default() += 1; + } + + active_request_count_by_peer + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, - peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, requester: RangeRequestId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + + let Some(block_peer) = peers + .iter() + .map(|peer| { + ( + // If contains -> 1 (order after), not contains -> 0 (order first) + peers_to_deprioritize.contains(peer), + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, _, peer)| *peer) + else { + // Backfill and forward sync handle this condition gracefully. + // - Backfill sync: will pause waiting for more peers to join + // - Forward sync: can never happen as the chain is dropped when removing the last peer. + return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer)); + }; + + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = + if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { + let column_indexes = self.network_globals().sampling_columns.clone(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; + // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { id: self.next_id(), requester, }; - // Compute custody column peers before sending the blocks_by_range request. If we don't have - // enough peers, error here. - let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); - Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?) - } else { - None - }; - - let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; + let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?; let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { Some(self.send_blobs_by_range_request( - peer_id, + block_peer, BlobsByRangeRequest { start_slot: *request.start_slot(), count: *request.count(), @@ -419,64 +508,98 @@ impl SyncNetworkContext { None }; - let data_columns = if let Some(data_column_requests) = data_column_requests { - let data_column_requests = data_column_requests - .into_iter() - .map(|(peer_id, columns_by_range_request)| { - self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id) - }) - .collect::, _>>()?; + let data_column_requests = columns_by_range_peers_to_request + .map(|columns_by_range_peers_to_request| { + columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns, + }, + id, + ) + }) + .collect::, _>>() + }) + .transpose()?; - Some(( - data_column_requests, - self.network_globals() - .sampling_columns - .iter() - .cloned() - .collect::>(), - )) - } else { - None - }; - - let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns); + let info = RangeBlockComponentsRequest::new( + blocks_req_id, + blobs_req_id, + data_column_requests.map(|data_column_requests| { + ( + data_column_requests, + self.network_globals() + .sampling_columns + .clone() + .iter() + .copied() + .collect(), + ) + }), + ); self.components_by_range_requests.insert(id, info); Ok(id.id) } - fn make_columns_by_range_requests( + fn select_columns_by_range_peers_to_request( &self, - request: BlocksByRangeRequest, custody_indexes: &HashSet, - ) -> Result, RpcRequestSendError> { - let mut peer_id_to_request_map = HashMap::new(); + peers: &HashSet, + active_request_count_by_peer: HashMap, + peers_to_deprioritize: &HashSet, + ) -> Result>, RpcRequestSendError> { + let mut columns_to_request_by_peer = HashMap::>::new(); for column_index in custody_indexes { - // TODO(das): The peer selection logic here needs to be improved - we should probably - // avoid retrying from failed peers, however `BatchState` currently only tracks the peer - // serving the blocks. - let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else { + // Strictly consider peers that are custodials of this column AND are part of this + // syncing chain. If the forward range sync chain has few peers, it's likely that this + // function will not be able to find peers on our custody columns. + let Some(custody_peer) = peers + .iter() + .filter(|peer| { + self.network_globals() + .is_custody_peer_of(*column_index, peer) + }) + .map(|peer| { + ( + // If contains -> 1 (order after), not contains -> 0 (order first) + peers_to_deprioritize.contains(peer), + // Prefer peers with less overall requests + // Also account for requests that are not yet issued tracked in peer_id_to_request_map + // We batch requests to the same peer, so count existance in the + // `columns_to_request_by_peer` as a single 1 request. + active_request_count_by_peer.get(peer).copied().unwrap_or(0) + + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, _, peer)| *peer) + else { // TODO(das): this will be pretty bad UX. To improve we should: - // - Attempt to fetch custody requests first, before requesting blocks // - Handle the no peers case gracefully, maybe add some timeout and give a few // minutes / seconds to the peer manager to locate peers on this subnet before // abandoing progress on the chain completely. - return Err(RpcRequestSendError::NoCustodyPeers); + return Err(RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer( + *column_index, + ))); }; - let columns_by_range_request = peer_id_to_request_map + columns_to_request_by_peer .entry(custody_peer) - .or_insert_with(|| DataColumnsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - columns: vec![], - }); - - columns_by_range_request.columns.push(*column_index); + .or_default() + .push(*column_index); } - Ok(peer_id_to_request_map) + Ok(columns_to_request_by_peer) } /// Received a blocks by range or blobs by range response for a request that couples blocks ' @@ -536,11 +659,21 @@ impl SyncNetworkContext { lookup_peers: Arc>>, block_root: Hash256, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() .iter() - .choose(&mut rand::thread_rng()) - .copied() + .map(|peer| { + ( + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) else { // Allow lookup to not have any peers and do nothing. This is an optimization to not // lose progress of lookups created from a block with unknown parent before we receive @@ -597,7 +730,7 @@ impl SyncNetworkContext { request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)), app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlocksByRoot", @@ -632,11 +765,21 @@ impl SyncNetworkContext { block_root: Hash256, expected_blobs: usize, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() .iter() - .choose(&mut rand::thread_rng()) - .copied() + .map(|peer| { + ( + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) else { // Allow lookup to not have any peers and do nothing. This is an optimization to not // lose progress of lookups created from a block with unknown parent before we receive @@ -686,7 +829,7 @@ impl SyncNetworkContext { request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)), app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlobsByRoot", @@ -821,7 +964,25 @@ impl SyncNetworkContext { self.custody_by_root_requests.insert(requester, request); Ok(LookupRequestResult::RequestSent(id.req_id)) } - Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)), + Err(e) => Err(match e { + CustodyRequestError::NoPeer(column_index) => { + RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(column_index)) + } + // - TooManyFailures: Should never happen, `request` has just been created, it's + // count of download_failures is 0 here + // - BadState: Should never happen, a bad state can only happen when handling a + // network response + // - UnexpectedRequestId: Never happens: this Err is only constructed handling a + // download or processing response + // - SendFailed: Should never happen unless in a bad drop sequence when shutting + // down the node + e @ (CustodyRequestError::TooManyFailures + | CustodyRequestError::BadState { .. } + | CustodyRequestError::UnexpectedRequestId { .. } + | CustodyRequestError::SendFailed { .. }) => { + RpcRequestSendError::InternalError(format!("{e:?}")) + } + }), } } @@ -841,7 +1002,7 @@ impl SyncNetworkContext { request: RequestType::BlocksByRange(request.clone().into()), app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlocksByRange", @@ -882,7 +1043,7 @@ impl SyncNetworkContext { request: RequestType::BlobsByRange(request.clone()), app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlobsByRange", @@ -921,7 +1082,7 @@ impl SyncNetworkContext { request: RequestType::DataColumnsByRange(request.clone()), app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "DataColumnsByRange", diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index e7e6e62349..f4d010b881 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -45,7 +45,7 @@ pub enum Error { SendFailed(&'static str), TooManyFailures, BadState(String), - NoPeers(ColumnIndex), + NoPeer(ColumnIndex), /// Received a download result for a different request id than the in-flight request. /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. @@ -56,7 +56,6 @@ pub enum Error { } struct ActiveBatchColumnsRequest { - peer_id: PeerId, indices: Vec, } @@ -220,6 +219,7 @@ impl ActiveCustodyRequest { return Ok(Some((columns, peer_group, max_seen_timestamp))); } + let active_request_count_by_peer = cx.active_request_count_by_peer(); let mut columns_to_request_by_peer = HashMap::>::new(); let lookup_peers = self.lookup_peers.read(); @@ -238,15 +238,11 @@ impl ActiveCustodyRequest { // only query the peers on that fork. Should this case be handled? How to handle it? let custodial_peers = cx.get_custodial_peers(*column_index); - // TODO(das): cache this computation in a OneCell or similar to prevent having to - // run it every loop - let mut active_requests_by_peer = HashMap::::new(); - for batch_request in self.active_batch_columns_requests.values() { - *active_requests_by_peer - .entry(batch_request.peer_id) - .or_default() += 1; - } - + // We draw from the total set of peers, but prioritize those peers who we have + // received an attestation / status / block message claiming to have imported the + // lookup. The frequency of those messages is low, so drawing only from lookup_peers + // could cause many lookups to take much longer or fail as they don't have enough + // custody peers on a given column let mut priorized_peers = custodial_peers .iter() .map(|peer| { @@ -256,9 +252,12 @@ impl ActiveCustodyRequest { // De-prioritize peers that have failed to successfully respond to // requests recently self.failed_peers.contains(peer), - // Prefer peers with less requests to load balance across peers - active_requests_by_peer.get(peer).copied().unwrap_or(0), - // Final random factor to give all peers a shot in each retry + // Prefer peers with fewer requests to load balance across peers. + // We batch requests to the same peer, so count existence in the + // `columns_to_request_by_peer` as a single 1 request. + active_request_count_by_peer.get(peer).copied().unwrap_or(0) + + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties rand::thread_rng().gen::(), *peer, ) @@ -276,7 +275,7 @@ impl ActiveCustodyRequest { // `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that // lookup will naturally retry when other peers send us attestations for // descendants of this un-available lookup. - return Err(Error::NoPeers(*column_index)); + return Err(Error::NoPeer(*column_index)); } else { // Do not issue requests if there is no custody peer on this column } @@ -306,13 +305,14 @@ impl ActiveCustodyRequest { let column_request = self .column_requests .get_mut(column_index) + // Should never happen: column_index is iterated from column_requests .ok_or(Error::BadState("unknown column_index".to_owned()))?; column_request.on_download_start(req_id)?; } self.active_batch_columns_requests - .insert(req_id, ActiveBatchColumnsRequest { indices, peer_id }); + .insert(req_id, ActiveBatchColumnsRequest { indices }); } LookupRequestResult::NoRequestNeeded(_) => unreachable!(), LookupRequestResult::Pending(_) => unreachable!(), diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index c9b85e47b6..963b633ed6 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -179,6 +179,10 @@ impl ActiveRequests { .collect() } + pub fn iter_request_peers(&self) -> impl Iterator + '_ { + self.requests.values().map(|request| request.peer_id) + } + pub fn len(&self) -> usize { self.requests.len() } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index c1ad550376..264f83ee82 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -107,7 +107,7 @@ pub struct BatchInfo { /// Number of processing attempts that have failed but we do not count. non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. - failed_download_attempts: Vec, + failed_download_attempts: Vec>, /// State of the batch. state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. @@ -132,7 +132,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Id), + Downloading(Id), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>, Instant), /// The batch is being processed. @@ -197,8 +197,8 @@ impl BatchInfo { peers.insert(attempt.peer_id); } - for download in &self.failed_download_attempts { - peers.insert(*download); + for peer in self.failed_download_attempts.iter().flatten() { + peers.insert(*peer); } peers @@ -206,18 +206,17 @@ impl BatchInfo { /// Verifies if an incoming block belongs to this batch. pub fn is_expecting_block(&self, request_id: &Id) -> bool { - if let BatchState::Downloading(_, expected_id) = &self.state { + if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } false } /// Returns the peer that is currently responsible for progressing the state of the batch. - pub fn current_peer(&self) -> Option<&PeerId> { + pub fn processing_peer(&self) -> Option<&PeerId> { match &self.state { - BatchState::AwaitingDownload | BatchState::Failed => None, - BatchState::Downloading(peer_id, _) - | BatchState::AwaitingProcessing(peer_id, _, _) + BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None, + BatchState::AwaitingProcessing(peer_id, _, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -276,9 +275,10 @@ impl BatchInfo { pub fn download_completed( &mut self, blocks: Vec>, + peer: PeerId, ) -> Result { match self.state.poison() { - BatchState::Downloading(peer, _request_id) => { + BatchState::Downloading(_) => { let received = blocks.len(); self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) @@ -297,19 +297,18 @@ impl BatchInfo { /// Mark the batch as failed and return whether we can attempt a re-download. /// /// This can happen if a peer disconnects or some error occurred that was not the peers fault. - /// THe `mark_failed` parameter, when set to false, does not increment the failed attempts of + /// The `peer` parameter, when set to None, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] pub fn download_failed( &mut self, - mark_failed: bool, + peer: Option, ) -> Result { match self.state.poison() { - BatchState::Downloading(peer, _request_id) => { + BatchState::Downloading(_) => { // register the attempt and check if the batch can be tried again - if mark_failed { - self.failed_download_attempts.push(peer); - } + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() >= B::max_batch_download_attempts() as usize { @@ -331,14 +330,10 @@ impl BatchInfo { } } - pub fn start_downloading_from_peer( - &mut self, - peer: PeerId, - request_id: Id, - ) -> Result<(), WrongState> { + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(peer, request_id); + self.state = BatchState::Downloading(request_id); Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -477,8 +472,8 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(ref peer, ref blocks, _) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - BatchState::Downloading(peer, request_id) => { - write!(f, "Downloading({}, {})", peer, request_id) + BatchState::Downloading(request_id) => { + write!(f, "Downloading({})", request_id) } BatchState::Poisoned => f.write_str("Poisoned"), } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 813eb7a0c7..be01734417 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,16 +2,13 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::network_context::{RangeRequestId, RpcResponseError}; +use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; -use fnv::FnvHashMap; use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; -use rand::seq::SliceRandom; -use rand::Rng; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; use tracing::{debug, instrument, warn}; @@ -91,7 +88,7 @@ pub struct SyncingChain { /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from, as well as the batches we are currently /// requesting. - peers: FnvHashMap>, + peers: HashSet, /// Starting epoch of the next batch that needs to be downloaded. to_be_downloaded: BatchId, @@ -133,9 +130,6 @@ impl SyncingChain { peer_id: PeerId, chain_type: SyncingChainType, ) -> Self { - let mut peers = FnvHashMap::default(); - peers.insert(peer_id, Default::default()); - SyncingChain { id, chain_type, @@ -143,7 +137,7 @@ impl SyncingChain { target_head_slot, target_head_root, batches: BTreeMap::new(), - peers, + peers: HashSet::from_iter([peer_id]), to_be_downloaded: start_epoch, processing_target: start_epoch, optimistic_start: None, @@ -173,7 +167,7 @@ impl SyncingChain { /// Peers currently syncing this chain. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] pub fn peers(&self) -> impl Iterator + '_ { - self.peers.keys().cloned() + self.peers.iter().cloned() } /// Progress in epochs made by the chain @@ -196,29 +190,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] - pub fn remove_peer( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { - if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches. - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(true)? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: id, - }); - } - self.retry_batch_download(network, id)?; - } else { - debug!(%peer_id, batch = ?id, "Batch not found while removing peer") - } - } - } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + self.peers.remove(peer_id); if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) @@ -270,11 +243,9 @@ impl SyncingChain { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - self.peers - .get_mut(peer_id) - .map(|active_requests| active_requests.remove(&batch_id)); - let received = batch.download_completed(blocks)?; + // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 + let received = batch.download_completed(blocks, *peer_id)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -476,7 +447,7 @@ impl SyncingChain { } }; - let peer = batch.current_peer().cloned().ok_or_else(|| { + let peer = batch.processing_peer().cloned().ok_or_else(|| { RemoveChain::WrongBatchState(format!( "Processing target is in wrong state: {:?}", batch.state(), @@ -582,7 +553,7 @@ impl SyncingChain { "Batch failed to download. Dropping chain scoring peers" ); - for (peer, _) in self.peers.drain() { + for peer in self.peers.drain() { network.report_peer(peer, *penalty, "faulty_chain"); } Err(RemoveChain::ChainFailed { @@ -595,7 +566,7 @@ impl SyncingChain { BatchProcessResult::NonFaultyFailure => { batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; // Simply redownload the batch. - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } } } @@ -616,7 +587,7 @@ impl SyncingChain { debug!(%epoch, reason, "Rejected optimistic batch left for future use"); // this batch is now treated as any other batch, and re-requested for future use if redownload { - return self.retry_batch_download(network, epoch); + return self.send_batch(network, epoch); } } else { debug!(%epoch, reason, "Rejected optimistic batch"); @@ -696,12 +667,7 @@ impl SyncingChain { } } } - BatchState::Downloading(peer, ..) => { - // remove this batch from the peer's active requests - if let Some(active_batches) = self.peers.get_mut(peer) { - active_batches.remove(&id); - } - } + BatchState::Downloading(..) => {} BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { crit!("batch indicates inconsistent chain state while advancing chain") } @@ -790,10 +756,10 @@ impl SyncingChain { self.processing_target = self.start_epoch; for id in redownload_queue { - self.retry_batch_download(network, id)?; + self.send_batch(network, id)?; } // finally, re-request the failed batch. - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } pub fn stop_syncing(&mut self) { @@ -849,13 +815,8 @@ impl SyncingChain { network: &mut SyncNetworkContext, peer_id: PeerId, ) -> ProcessingResult { - // add the peer without overwriting its active requests - if self.peers.entry(peer_id).or_default().is_empty() { - // Either new or not, this peer is idle, try to request more batches - self.request_batches(network) - } else { - Ok(KeepChain) - } + self.peers.insert(peer_id); + self.request_batches(network) } /// An RPC error has occurred. @@ -896,16 +857,15 @@ impl SyncingChain { %request_id, "Batch download error" ); - if let Some(active_requests) = self.peers.get_mut(peer_id) { - active_requests.remove(&batch_id); - } - if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(Some(*peer_id))? + { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }); } - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } else { debug!( batch_epoch = %batch_id, @@ -919,66 +879,42 @@ impl SyncingChain { } } - /// Sends and registers the request of a batch awaiting download. - #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] - pub fn retry_batch_download( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - ) -> ProcessingResult { - let Some(batch) = self.batches.get_mut(&batch_id) else { - return Ok(KeepChain); - }; - - // Find a peer to request the batch - let failed_peers = batch.failed_peers(); - - let new_peer = self - .peers - .iter() - .map(|(peer, requests)| { - ( - failed_peers.contains(peer), - requests.len(), - rand::thread_rng().gen::(), - *peer, - ) - }) - // Sort peers prioritizing unrelated peers with less active requests. - .min() - .map(|(_, _, _, peer)| peer); - - if let Some(peer) = new_peer { - self.send_batch(network, batch_id, peer) - } else { - // If we are here the chain has no more peers - Err(RemoveChain::EmptyPeerPool) - } - } - /// Requests the batch assigned to the given id from a given peer. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] pub fn send_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer: PeerId, ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_peers(); + + // TODO(das): we should request only from peers that are part of this SyncingChain. + // However, then we hit the NoPeer error frequently which causes the batch to fail and + // the SyncingChain to be dropped. We need to handle this case more gracefully. + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + match network.block_components_by_range_request( - peer, batch_type, request, RangeRequestId::RangeSync { chain_id: self.id, batch_id, }, + &synced_peers, + &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer, request_id)?; + batch.start_downloading(request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -988,41 +924,34 @@ impl SyncingChain { } else { debug!(epoch = %batch_id, %batch, %batch_state, "Requesting batch"); } - // register the batch for this peer - return self - .peers - .get_mut(&peer) - .map(|requests| { - requests.insert(batch_id); - Ok(KeepChain) - }) - .unwrap_or_else(|| { - Err(RemoveChain::WrongChainState(format!( - "Sending batch to a peer that is not in the chain: {}", - peer - ))) - }); + return Ok(KeepChain); } - Err(e) => { - // NOTE: under normal conditions this shouldn't happen but we handle it anyway - warn!(%batch_id, error = %e, %batch, "Could not send batch request"); - // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant - self.peers - .get_mut(&peer) - .map(|request| request.remove(&batch_id)); - match batch.download_failed(true)? { - BatchOperationOutcome::Failed { blacklist } => { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: batch_id, - }) - } - BatchOperationOutcome::Continue => { - return self.retry_batch_download(network, batch_id) + Err(e) => match e { + // TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For + // sync to work properly it must be okay to have "stalled" batches in + // AwaitingDownload state. Currently it will error with invalid state if + // that happens. Sync manager must periodicatlly prune stalled batches like + // we do for lookup sync. Then we can deprecate the redundant + // `good_peers_on_sampling_subnets` checks. + e + @ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request"); + // register the failed download and check if the batch can be retried + batch.start_downloading(1)?; // fake request_id = 1 is not relevant + match batch.download_failed(None)? { + BatchOperationOutcome::Failed { blacklist } => { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) + } + BatchOperationOutcome::Continue => { + return self.send_batch(network, batch_id) + } } } - } + }, } } @@ -1061,21 +990,6 @@ impl SyncingChain { // find the next pending batch and request it from the peer - // randomize the peers for load balancing - let mut rng = rand::thread_rng(); - let mut idle_peers = self - .peers - .iter() - .filter_map(|(peer, requests)| { - if requests.is_empty() { - Some(*peer) - } else { - None - } - }) - .collect::>(); - idle_peers.shuffle(&mut rng); - // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { @@ -1085,26 +999,25 @@ impl SyncingChain { } if let Entry::Vacant(entry) = self.batches.entry(epoch) { - if let Some(peer) = idle_peers.pop() { - let batch_type = network.batch_type(epoch); - let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); - entry.insert(optimistic_batch); - self.send_batch(network, epoch, peer)?; - } + let batch_type = network.batch_type(epoch); + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); + entry.insert(optimistic_batch); + self.send_batch(network, epoch)?; } return Ok(KeepChain); } - while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch(network) { - // send the batch - self.send_batch(network, batch_id, peer)?; - } else { - // No more batches, simply stop - return Ok(KeepChain); - } + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; } + // No more batches, simply stop Ok(KeepChain) } @@ -1149,6 +1062,7 @@ impl SyncingChain { { return None; } + // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. @@ -1177,19 +1091,20 @@ impl SyncingChain { return None; } - let batch_id = self.to_be_downloaded; + // If no batch needs a retry, attempt to send the batch of the next epoch to download + let next_batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch - match self.batches.entry(batch_id) { + match self.batches.entry(next_batch_id) { Entry::Occupied(_) => { // this batch doesn't need downloading, let this same function decide the next batch self.to_be_downloaded += EPOCHS_PER_BATCH; self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(batch_id); - entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type)); + let batch_type = network.batch_type(next_batch_id); + entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type)); self.to_be_downloaded += EPOCHS_PER_BATCH; - Some(batch_id) + Some(next_batch_id) } } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c87418b87b..1ec1440991 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -317,9 +317,8 @@ where skip_all )] fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in self - .chains - .call_all(|chain| chain.remove_peer(peer_id, network)) + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.remove_peer(peer_id)) { self.on_chain_removed( removed_chain, diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 84c95b2a4c..565d7bc9f8 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -357,10 +357,13 @@ impl TestRig { pub fn new_connected_peer(&mut self) -> PeerId { let key = self.determinstic_key(); - self.network_globals + let peer_id = self + .network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec, key) + .__add_connected_peer_testing_only(false, &self.harness.spec, key); + self.log(&format!("Added new peer for testing {peer_id:?}")); + peer_id } pub fn new_connected_supernode_peer(&mut self) -> PeerId {