diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 083887046a..95a4e82fa2 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,6 +1,8 @@ use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; use crate::discovery::{peer_id_to_node_id, CombinedKey}; -use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId}; +use crate::{ + metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo, +}; use itertools::Itertools; use logging::crit; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; @@ -15,7 +17,7 @@ use std::{ use sync_status::SyncStatus; use tracing::{debug, error, trace, warn}; use types::data_column_custody_group::compute_subnets_for_node; -use types::{ChainSpec, DataColumnSubnetId, EthSpec}; +use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot}; pub mod client; pub mod peer_info; @@ -735,6 +737,19 @@ impl PeerDB { }, ); + self.update_sync_status( + &peer_id, + SyncStatus::Synced { + // Fill in mock SyncInfo, only for the peer to return `is_synced() == true`. + info: SyncInfo { + head_slot: Slot::new(0), + head_root: Hash256::ZERO, + finalized_epoch: Epoch::new(0), + finalized_root: Hash256::ZERO, + }, + }, + ); + if supernode { let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); let all_subnets = (0..spec.data_column_sidecar_subnet_count) diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 3031a0dff7..fd99d93589 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -206,6 +206,20 @@ impl NetworkGlobals { .collect::>() } + /// Returns true if the peer is known and is a custodian of `column_index` + pub fn is_custody_peer_of(&self, column_index: ColumnIndex, peer_id: &PeerId) -> bool { + self.peers + .read() + .peer_info(peer_id) + .map(|info| { + info.is_assigned_to_custody_subnet(&DataColumnSubnetId::from_column_index( + column_index, + &self.spec, + )) + }) + .unwrap_or(false) + } + /// Returns the TopicConfig to compute the set of Gossip topics for a given fork pub fn as_topic_config(&self) -> TopicConfig { TopicConfig { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9a8edbfa4c..cfd5c24f99 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1141,7 +1141,7 @@ use { }; #[cfg(test)] -type TestBeaconChainType = +pub(crate) type TestBeaconChainType = Witness, E, MemoryStore, MemoryStore>; #[cfg(test)] diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 509caf7316..fcef06271f 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -10,7 +10,9 @@ use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::manager::BatchProcessResult; -use crate::sync::network_context::{RangeRequestId, RpcResponseError, SyncNetworkContext}; +use crate::sync::network_context::{ + RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, +}; use crate::sync::range_sync::{ BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, }; @@ -20,10 +22,9 @@ use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; -use rand::seq::SliceRandom; use std::collections::{ btree_map::{BTreeMap, Entry}, - HashMap, HashSet, + HashSet, }; use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; @@ -121,9 +122,6 @@ pub struct BackFillSync { /// Sorted map of batches undergoing some kind of processing. batches: BTreeMap>, - /// List of peers we are currently awaiting a response for. - active_requests: HashMap>, - /// The current processing batch, if any. current_processing_batch: Option, @@ -176,7 +174,6 @@ impl BackFillSync { let bfs = BackFillSync { batches: BTreeMap::new(), - active_requests: HashMap::new(), processing_target: current_start, current_start, last_batch_downloaded: false, @@ -314,45 +311,11 @@ impl BackFillSync { skip_all )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!(self.state(), BackFillState::Failed) { return Ok(()); } - if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches. - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - match batch.download_failed(false) { - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(id))?; - } - Ok(BatchOperationOutcome::Continue) => {} - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; - } - } - // If we have run out of peers in which to retry this batch, the backfill state - // transitions to a paused state. - // We still need to reset the state for all the affected batches, so we should not - // short circuit early. - if self.retry_batch_download(network, id).is_err() { - debug!( - batch_id = %id, - error = "no synced peers", - "Batch could not be retried" - ); - } - } else { - debug!(peer = %peer_id, batch = %id, "Batch not found while removing peer"); - } - } - } - // Remove the peer from the participation list self.participating_peers.remove(peer_id); Ok(()) @@ -386,15 +349,12 @@ impl BackFillSync { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); - if let Some(active_requests) = self.active_requests.get_mut(peer_id) { - active_requests.remove(&batch_id); - } - match batch.download_failed(true) { + match batch.download_failed(Some(*peer_id)) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) } - Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id), } } else { // this could be an error for an old batch, removed when the chain advances @@ -435,19 +395,11 @@ impl BackFillSync { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - // TODO(das): removed peer_id matching as the node may request a different peer for data - // columns. if !batch.is_expecting_block(&request_id) { return Ok(ProcessResult::Successful); } - // A stream termination has been sent. This batch has ended. Process a completed batch. - // Remove the request from the peer's active batches - self.active_requests - .get_mut(peer_id) - .map(|active_requests| active_requests.remove(&batch_id)); - - match batch.download_completed(blocks) { + match batch.download_completed(blocks, *peer_id) { Ok(received) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; @@ -488,7 +440,6 @@ impl BackFillSync { self.set_state(BackFillState::Failed); // Remove all batches and active requests and participating peers. self.batches.clear(); - self.active_requests.clear(); self.participating_peers.clear(); self.restart_failed_sync = false; @@ -622,7 +573,7 @@ impl BackFillSync { } }; - let Some(peer) = batch.current_peer() else { + let Some(peer) = batch.processing_peer() else { self.fail_sync(BackFillError::BatchInvalidState( batch_id, String::from("Peer does not exist"), @@ -698,6 +649,8 @@ impl BackFillSync { ); for peer in self.participating_peers.drain() { + // TODO(das): `participating_peers` only includes block peers. Should we + // penalize the custody column peers too? network.report_peer(peer, *penalty, "backfill_batch_failed"); } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) @@ -723,7 +676,7 @@ impl BackFillSync { { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } - self.retry_batch_download(network, batch_id)?; + self.send_batch(network, batch_id)?; Ok(ProcessResult::Successful) } } @@ -864,12 +817,7 @@ impl BackFillSync { } } } - BatchState::Downloading(peer, ..) => { - // remove this batch from the peer's active requests - if let Some(active_requests) = self.active_requests.get_mut(peer) { - active_requests.remove(&id); - } - } + BatchState::Downloading(..) => {} BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { crit!("batch indicates inconsistent chain state while advancing chain") } @@ -951,57 +899,10 @@ impl BackFillSync { self.processing_target = self.current_start; for id in redownload_queue { - self.retry_batch_download(network, id)?; + self.send_batch(network, id)?; } // finally, re-request the failed batch. - self.retry_batch_download(network, batch_id) - } - - /// Sends and registers the request of a batch awaiting download. - #[instrument(parent = None, - level = "info", - fields(service = "backfill_sync"), - name = "backfill_sync", - skip_all - )] - fn retry_batch_download( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - ) -> Result<(), BackFillError> { - let Some(batch) = self.batches.get_mut(&batch_id) else { - return Ok(()); - }; - - // Find a peer to request the batch - let failed_peers = batch.failed_peers(); - - let new_peer = self - .network_globals - .peers - .read() - .synced_peers() - .map(|peer| { - ( - failed_peers.contains(peer), - self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0), - rand::random::(), - *peer, - ) - }) - // Sort peers prioritizing unrelated peers with less active requests. - .min() - .map(|(_, _, _, peer)| peer); - - if let Some(peer) = new_peer { - self.participating_peers.insert(peer); - self.send_batch(network, batch_id, peer) - } else { - // If we are here the chain has no more synced peers - info!(reason = "insufficient_synced_peers", "Backfill sync paused"); - self.set_state(BackFillState::Paused); - Err(BackFillError::Paused) - } + self.send_batch(network, batch_id) } /// Requests the batch assigned to the given id from a given peer. @@ -1015,53 +916,65 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer: PeerId, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + let (request, is_blob_batch) = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( - peer, is_blob_batch, request, RangeRequestId::BackfillSync { batch_id }, + &synced_peers, + &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request - if let Err(e) = batch.start_downloading_from_peer(peer, request_id) { + if let Err(e) = batch.start_downloading(request_id) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); } debug!(epoch = %batch_id, %batch, "Requesting batch"); - // register the batch for this peer - self.active_requests - .entry(peer) - .or_default() - .insert(batch_id); return Ok(()); } - Err(e) => { - // NOTE: under normal conditions this shouldn't happen but we handle it anyway - warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); - // register the failed download and check if the batch can be retried - if let Err(e) = batch.start_downloading_from_peer(peer, 1) { - return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + Err(e) => match e { + RpcRequestSendError::NoPeer(no_peer) => { + // If we are here the chain has no more synced peers + info!( + "reason" = format!("insufficient_synced_peers({no_peer:?})"), + "Backfill sync paused" + ); + self.set_state(BackFillState::Paused); + return Err(BackFillError::Paused); } - self.active_requests - .get_mut(&peer) - .map(|request| request.remove(&batch_id)); + RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } - match batch.download_failed(true) { - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? - } - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? - } - Ok(BatchOperationOutcome::Continue) => { - return self.retry_batch_download(network, batch_id) + match batch.download_failed(None) { + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id) + } } } - } + }, } } @@ -1093,7 +1006,7 @@ impl BackFillSync { .collect::>(); for batch_id in batch_ids_to_retry { - self.retry_batch_download(network, batch_id)?; + self.send_batch(network, batch_id)?; } Ok(()) } @@ -1115,34 +1028,16 @@ impl BackFillSync { } // find the next pending batch and request it from the peer - - // randomize the peers for load balancing - let mut rng = rand::thread_rng(); - let mut idle_peers = self - .network_globals - .peers - .read() - .synced_peers() - .filter(|peer_id| { - self.active_requests - .get(peer_id) - .map(|requests| requests.is_empty()) - .unwrap_or(true) - }) - .cloned() - .collect::>(); - - idle_peers.shuffle(&mut rng); - - while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch(network) { - // send the batch - self.send_batch(network, batch_id, peer)?; - } else { - // No more batches, simply stop - return Ok(()); - } + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; } + + // No more batches, simply stop Ok(()) } @@ -1296,3 +1191,73 @@ enum ResetEpochError { /// The chain has already completed. SyncCompleted, } + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::test_utils::BeaconChainHarness; + use bls::Hash256; + use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus}; + use rand::prelude::StdRng; + use rand::SeedableRng; + use types::MinimalEthSpec; + + #[test] + fn request_batches_should_not_loop_infinitely() { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(4) + .fresh_ephemeral_store() + .build(); + + let beacon_chain = harness.chain.clone(); + let slots_per_epoch = MinimalEthSpec::slots_per_epoch(); + + let network_globals = Arc::new(NetworkGlobals::new_test_globals( + vec![], + Arc::new(NetworkConfig::default()), + beacon_chain.spec.clone(), + )); + + { + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let peer_id = network_globals + .peers + .write() + .__add_connected_peer_testing_only( + true, + &beacon_chain.spec, + k256::ecdsa::SigningKey::random(&mut rng).into(), + ); + + // Simulate finalized epoch and head being 2 epochs ahead + let finalized_epoch = Epoch::new(40); + let head_epoch = finalized_epoch + 2; + let head_slot = head_epoch.start_slot(slots_per_epoch) + 1; + + network_globals.peers.write().update_sync_status( + &peer_id, + SyncStatus::Synced { + info: SyncInfo { + head_slot, + head_root: Hash256::random(), + finalized_epoch, + finalized_root: Hash256::random(), + }, + }, + ); + } + + let mut network = SyncNetworkContext::new_for_testing( + beacon_chain.clone(), + network_globals.clone(), + harness.runtime.task_executor.clone(), + ); + + let mut backfill = BackFillSync::new(beacon_chain, network_globals); + backfill.set_state(BackFillState::Syncing); + + // if this ends up running into an infinite loop, the test will overflow the stack pretty quickly. + let _ = backfill.request_batches(&mut network); + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 84e492c04f..9119b1652c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -515,9 +515,7 @@ impl SyncManager { // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); - let _ = self - .backfill_sync - .peer_disconnected(peer_id, &mut self.network); + let _ = self.backfill_sync.peer_disconnected(peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2cb5ec9a0a..d9eda651e7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -9,6 +9,8 @@ use super::range_sync::ByRangeRequestType; use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; +#[cfg(test)] +use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; @@ -27,18 +29,20 @@ use lighthouse_network::service::api_types::{ }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; -use rand::prelude::IteratorRandom; -use rand::thread_rng; pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, }; +#[cfg(test)] +use slot_clock::SlotClock; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; +#[cfg(test)] +use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{debug, error, span, warn, Level}; use types::blob_sidecar::FixedBlobSidecarList; @@ -82,24 +86,18 @@ pub enum RpcResponseError { #[derive(Debug, PartialEq, Eq)] pub enum RpcRequestSendError { - /// Network channel send failed - NetworkSendError, - NoCustodyPeers, - CustodyRequestError(custody::Error), - SlotClockError, + /// No peer available matching the required criteria + NoPeer(NoPeerError), + /// These errors should never happen, including unreachable custody errors or network send + /// errors. + InternalError(String), } -impl std::fmt::Display for RpcRequestSendError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - RpcRequestSendError::NetworkSendError => write!(f, "Network send error"), - RpcRequestSendError::NoCustodyPeers => write!(f, "No custody peers"), - RpcRequestSendError::CustodyRequestError(e) => { - write!(f, "Custody request error: {:?}", e) - } - RpcRequestSendError::SlotClockError => write!(f, "Slot clock error"), - } - } +/// Type of peer missing that caused a `RpcRequestSendError::NoPeers` +#[derive(Debug, PartialEq, Eq)] +pub enum NoPeerError { + BlockPeer, + CustodyPeer(ColumnIndex), } #[derive(Debug, PartialEq, Eq)] @@ -232,6 +230,35 @@ pub enum RangeBlockComponent { ), } +#[cfg(test)] +impl SyncNetworkContext> { + pub fn new_for_testing( + beacon_chain: Arc>>, + network_globals: Arc>, + task_executor: TaskExecutor, + ) -> Self { + let fork_context = Arc::new(ForkContext::new::( + beacon_chain.slot_clock.now().unwrap_or(Slot::new(0)), + beacon_chain.genesis_validators_root, + &beacon_chain.spec, + )); + let (network_tx, _network_rx) = mpsc::unbounded_channel(); + let (beacon_processor, _) = NetworkBeaconProcessor::null_for_testing( + network_globals, + mpsc::unbounded_channel().0, + beacon_chain.clone(), + task_executor, + ); + + SyncNetworkContext::new( + network_tx, + Arc::new(beacon_processor), + beacon_chain, + fork_context, + ) + } +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -331,12 +358,6 @@ impl SyncNetworkContext { .custody_peers_for_column(column_index) } - pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option { - self.get_custodial_peers(column_index) - .into_iter() - .choose(&mut thread_rng()) - } - pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } @@ -381,34 +402,102 @@ impl SyncNetworkContext { } } + fn active_request_count_by_peer(&self) -> HashMap { + let Self { + network_send: _, + request_id: _, + blocks_by_root_requests, + blobs_by_root_requests, + data_columns_by_root_requests, + blocks_by_range_requests, + blobs_by_range_requests, + data_columns_by_range_requests, + // custody_by_root_requests is a meta request of data_columns_by_root_requests + custody_by_root_requests: _, + // components_by_range_requests is a meta request of various _by_range requests + components_by_range_requests: _, + execution_engine_state: _, + network_beacon_processor: _, + chain: _, + fork_context: _, + // Don't use a fallback match. We want to be sure that all requests are considered when + // adding new ones + } = self; + + let mut active_request_count_by_peer = HashMap::::new(); + + for peer_id in blocks_by_root_requests + .iter_request_peers() + .chain(blobs_by_root_requests.iter_request_peers()) + .chain(data_columns_by_root_requests.iter_request_peers()) + .chain(blocks_by_range_requests.iter_request_peers()) + .chain(blobs_by_range_requests.iter_request_peers()) + .chain(data_columns_by_range_requests.iter_request_peers()) + { + *active_request_count_by_peer.entry(peer_id).or_default() += 1; + } + + active_request_count_by_peer + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, - peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, requester: RangeRequestId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + + let Some(block_peer) = peers + .iter() + .map(|peer| { + ( + // If contains -> 1 (order after), not contains -> 0 (order first) + peers_to_deprioritize.contains(peer), + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, _, peer)| *peer) + else { + // Backfill and forward sync handle this condition gracefully. + // - Backfill sync: will pause waiting for more peers to join + // - Forward sync: can never happen as the chain is dropped when removing the last peer. + return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer)); + }; + + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = + if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { + let column_indexes = self.network_globals().sampling_columns.clone(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; + // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { id: self.next_id(), requester, }; - // Compute custody column peers before sending the blocks_by_range request. If we don't have - // enough peers, error here. - let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); - Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?) - } else { - None - }; - - let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; + let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?; let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { Some(self.send_blobs_by_range_request( - peer_id, + block_peer, BlobsByRangeRequest { start_slot: *request.start_slot(), count: *request.count(), @@ -419,64 +508,98 @@ impl SyncNetworkContext { None }; - let data_columns = if let Some(data_column_requests) = data_column_requests { - let data_column_requests = data_column_requests - .into_iter() - .map(|(peer_id, columns_by_range_request)| { - self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id) - }) - .collect::, _>>()?; + let data_column_requests = columns_by_range_peers_to_request + .map(|columns_by_range_peers_to_request| { + columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns, + }, + id, + ) + }) + .collect::, _>>() + }) + .transpose()?; - Some(( - data_column_requests, - self.network_globals() - .sampling_columns - .iter() - .cloned() - .collect::>(), - )) - } else { - None - }; - - let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns); + let info = RangeBlockComponentsRequest::new( + blocks_req_id, + blobs_req_id, + data_column_requests.map(|data_column_requests| { + ( + data_column_requests, + self.network_globals() + .sampling_columns + .clone() + .iter() + .copied() + .collect(), + ) + }), + ); self.components_by_range_requests.insert(id, info); Ok(id.id) } - fn make_columns_by_range_requests( + fn select_columns_by_range_peers_to_request( &self, - request: BlocksByRangeRequest, custody_indexes: &HashSet, - ) -> Result, RpcRequestSendError> { - let mut peer_id_to_request_map = HashMap::new(); + peers: &HashSet, + active_request_count_by_peer: HashMap, + peers_to_deprioritize: &HashSet, + ) -> Result>, RpcRequestSendError> { + let mut columns_to_request_by_peer = HashMap::>::new(); for column_index in custody_indexes { - // TODO(das): The peer selection logic here needs to be improved - we should probably - // avoid retrying from failed peers, however `BatchState` currently only tracks the peer - // serving the blocks. - let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else { + // Strictly consider peers that are custodials of this column AND are part of this + // syncing chain. If the forward range sync chain has few peers, it's likely that this + // function will not be able to find peers on our custody columns. + let Some(custody_peer) = peers + .iter() + .filter(|peer| { + self.network_globals() + .is_custody_peer_of(*column_index, peer) + }) + .map(|peer| { + ( + // If contains -> 1 (order after), not contains -> 0 (order first) + peers_to_deprioritize.contains(peer), + // Prefer peers with less overall requests + // Also account for requests that are not yet issued tracked in peer_id_to_request_map + // We batch requests to the same peer, so count existance in the + // `columns_to_request_by_peer` as a single 1 request. + active_request_count_by_peer.get(peer).copied().unwrap_or(0) + + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, _, peer)| *peer) + else { // TODO(das): this will be pretty bad UX. To improve we should: - // - Attempt to fetch custody requests first, before requesting blocks // - Handle the no peers case gracefully, maybe add some timeout and give a few // minutes / seconds to the peer manager to locate peers on this subnet before // abandoing progress on the chain completely. - return Err(RpcRequestSendError::NoCustodyPeers); + return Err(RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer( + *column_index, + ))); }; - let columns_by_range_request = peer_id_to_request_map + columns_to_request_by_peer .entry(custody_peer) - .or_insert_with(|| DataColumnsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - columns: vec![], - }); - - columns_by_range_request.columns.push(*column_index); + .or_default() + .push(*column_index); } - Ok(peer_id_to_request_map) + Ok(columns_to_request_by_peer) } /// Received a blocks by range or blobs by range response for a request that couples blocks ' @@ -536,11 +659,21 @@ impl SyncNetworkContext { lookup_peers: Arc>>, block_root: Hash256, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() .iter() - .choose(&mut rand::thread_rng()) - .copied() + .map(|peer| { + ( + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) else { // Allow lookup to not have any peers and do nothing. This is an optimization to not // lose progress of lookups created from a block with unknown parent before we receive @@ -597,7 +730,7 @@ impl SyncNetworkContext { request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)), app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlocksByRoot", @@ -632,11 +765,21 @@ impl SyncNetworkContext { block_root: Hash256, expected_blobs: usize, ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() .iter() - .choose(&mut rand::thread_rng()) - .copied() + .map(|peer| { + ( + // Prefer peers with less overall requests + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) else { // Allow lookup to not have any peers and do nothing. This is an optimization to not // lose progress of lookups created from a block with unknown parent before we receive @@ -686,7 +829,7 @@ impl SyncNetworkContext { request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)), app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlobsByRoot", @@ -821,7 +964,25 @@ impl SyncNetworkContext { self.custody_by_root_requests.insert(requester, request); Ok(LookupRequestResult::RequestSent(id.req_id)) } - Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)), + Err(e) => Err(match e { + CustodyRequestError::NoPeer(column_index) => { + RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(column_index)) + } + // - TooManyFailures: Should never happen, `request` has just been created, it's + // count of download_failures is 0 here + // - BadState: Should never happen, a bad state can only happen when handling a + // network response + // - UnexpectedRequestId: Never happens: this Err is only constructed handling a + // download or processing response + // - SendFailed: Should never happen unless in a bad drop sequence when shutting + // down the node + e @ (CustodyRequestError::TooManyFailures + | CustodyRequestError::BadState { .. } + | CustodyRequestError::UnexpectedRequestId { .. } + | CustodyRequestError::SendFailed { .. }) => { + RpcRequestSendError::InternalError(format!("{e:?}")) + } + }), } } @@ -841,7 +1002,7 @@ impl SyncNetworkContext { request: RequestType::BlocksByRange(request.clone().into()), app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlocksByRange", @@ -882,7 +1043,7 @@ impl SyncNetworkContext { request: RequestType::BlobsByRange(request.clone()), app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "BlobsByRange", @@ -921,7 +1082,7 @@ impl SyncNetworkContext { request: RequestType::DataColumnsByRange(request.clone()), app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; debug!( method = "DataColumnsByRange", diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index e7e6e62349..f4d010b881 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -45,7 +45,7 @@ pub enum Error { SendFailed(&'static str), TooManyFailures, BadState(String), - NoPeers(ColumnIndex), + NoPeer(ColumnIndex), /// Received a download result for a different request id than the in-flight request. /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. @@ -56,7 +56,6 @@ pub enum Error { } struct ActiveBatchColumnsRequest { - peer_id: PeerId, indices: Vec, } @@ -220,6 +219,7 @@ impl ActiveCustodyRequest { return Ok(Some((columns, peer_group, max_seen_timestamp))); } + let active_request_count_by_peer = cx.active_request_count_by_peer(); let mut columns_to_request_by_peer = HashMap::>::new(); let lookup_peers = self.lookup_peers.read(); @@ -238,15 +238,11 @@ impl ActiveCustodyRequest { // only query the peers on that fork. Should this case be handled? How to handle it? let custodial_peers = cx.get_custodial_peers(*column_index); - // TODO(das): cache this computation in a OneCell or similar to prevent having to - // run it every loop - let mut active_requests_by_peer = HashMap::::new(); - for batch_request in self.active_batch_columns_requests.values() { - *active_requests_by_peer - .entry(batch_request.peer_id) - .or_default() += 1; - } - + // We draw from the total set of peers, but prioritize those peers who we have + // received an attestation / status / block message claiming to have imported the + // lookup. The frequency of those messages is low, so drawing only from lookup_peers + // could cause many lookups to take much longer or fail as they don't have enough + // custody peers on a given column let mut priorized_peers = custodial_peers .iter() .map(|peer| { @@ -256,9 +252,12 @@ impl ActiveCustodyRequest { // De-prioritize peers that have failed to successfully respond to // requests recently self.failed_peers.contains(peer), - // Prefer peers with less requests to load balance across peers - active_requests_by_peer.get(peer).copied().unwrap_or(0), - // Final random factor to give all peers a shot in each retry + // Prefer peers with fewer requests to load balance across peers. + // We batch requests to the same peer, so count existence in the + // `columns_to_request_by_peer` as a single 1 request. + active_request_count_by_peer.get(peer).copied().unwrap_or(0) + + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), + // Random factor to break ties, otherwise the PeerID breaks ties rand::thread_rng().gen::(), *peer, ) @@ -276,7 +275,7 @@ impl ActiveCustodyRequest { // `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that // lookup will naturally retry when other peers send us attestations for // descendants of this un-available lookup. - return Err(Error::NoPeers(*column_index)); + return Err(Error::NoPeer(*column_index)); } else { // Do not issue requests if there is no custody peer on this column } @@ -306,13 +305,14 @@ impl ActiveCustodyRequest { let column_request = self .column_requests .get_mut(column_index) + // Should never happen: column_index is iterated from column_requests .ok_or(Error::BadState("unknown column_index".to_owned()))?; column_request.on_download_start(req_id)?; } self.active_batch_columns_requests - .insert(req_id, ActiveBatchColumnsRequest { indices, peer_id }); + .insert(req_id, ActiveBatchColumnsRequest { indices }); } LookupRequestResult::NoRequestNeeded(_) => unreachable!(), LookupRequestResult::Pending(_) => unreachable!(), diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index c9b85e47b6..963b633ed6 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -179,6 +179,10 @@ impl ActiveRequests { .collect() } + pub fn iter_request_peers(&self) -> impl Iterator + '_ { + self.requests.values().map(|request| request.peer_id) + } + pub fn len(&self) -> usize { self.requests.len() } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index c1ad550376..264f83ee82 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -107,7 +107,7 @@ pub struct BatchInfo { /// Number of processing attempts that have failed but we do not count. non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. - failed_download_attempts: Vec, + failed_download_attempts: Vec>, /// State of the batch. state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. @@ -132,7 +132,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Id), + Downloading(Id), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>, Instant), /// The batch is being processed. @@ -197,8 +197,8 @@ impl BatchInfo { peers.insert(attempt.peer_id); } - for download in &self.failed_download_attempts { - peers.insert(*download); + for peer in self.failed_download_attempts.iter().flatten() { + peers.insert(*peer); } peers @@ -206,18 +206,17 @@ impl BatchInfo { /// Verifies if an incoming block belongs to this batch. pub fn is_expecting_block(&self, request_id: &Id) -> bool { - if let BatchState::Downloading(_, expected_id) = &self.state { + if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } false } /// Returns the peer that is currently responsible for progressing the state of the batch. - pub fn current_peer(&self) -> Option<&PeerId> { + pub fn processing_peer(&self) -> Option<&PeerId> { match &self.state { - BatchState::AwaitingDownload | BatchState::Failed => None, - BatchState::Downloading(peer_id, _) - | BatchState::AwaitingProcessing(peer_id, _, _) + BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None, + BatchState::AwaitingProcessing(peer_id, _, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -276,9 +275,10 @@ impl BatchInfo { pub fn download_completed( &mut self, blocks: Vec>, + peer: PeerId, ) -> Result { match self.state.poison() { - BatchState::Downloading(peer, _request_id) => { + BatchState::Downloading(_) => { let received = blocks.len(); self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) @@ -297,19 +297,18 @@ impl BatchInfo { /// Mark the batch as failed and return whether we can attempt a re-download. /// /// This can happen if a peer disconnects or some error occurred that was not the peers fault. - /// THe `mark_failed` parameter, when set to false, does not increment the failed attempts of + /// The `peer` parameter, when set to None, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] pub fn download_failed( &mut self, - mark_failed: bool, + peer: Option, ) -> Result { match self.state.poison() { - BatchState::Downloading(peer, _request_id) => { + BatchState::Downloading(_) => { // register the attempt and check if the batch can be tried again - if mark_failed { - self.failed_download_attempts.push(peer); - } + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() >= B::max_batch_download_attempts() as usize { @@ -331,14 +330,10 @@ impl BatchInfo { } } - pub fn start_downloading_from_peer( - &mut self, - peer: PeerId, - request_id: Id, - ) -> Result<(), WrongState> { + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(peer, request_id); + self.state = BatchState::Downloading(request_id); Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -477,8 +472,8 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(ref peer, ref blocks, _) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - BatchState::Downloading(peer, request_id) => { - write!(f, "Downloading({}, {})", peer, request_id) + BatchState::Downloading(request_id) => { + write!(f, "Downloading({})", request_id) } BatchState::Poisoned => f.write_str("Poisoned"), } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 813eb7a0c7..be01734417 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,16 +2,13 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::network_context::{RangeRequestId, RpcResponseError}; +use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; -use fnv::FnvHashMap; use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; -use rand::seq::SliceRandom; -use rand::Rng; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; use tracing::{debug, instrument, warn}; @@ -91,7 +88,7 @@ pub struct SyncingChain { /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from, as well as the batches we are currently /// requesting. - peers: FnvHashMap>, + peers: HashSet, /// Starting epoch of the next batch that needs to be downloaded. to_be_downloaded: BatchId, @@ -133,9 +130,6 @@ impl SyncingChain { peer_id: PeerId, chain_type: SyncingChainType, ) -> Self { - let mut peers = FnvHashMap::default(); - peers.insert(peer_id, Default::default()); - SyncingChain { id, chain_type, @@ -143,7 +137,7 @@ impl SyncingChain { target_head_slot, target_head_root, batches: BTreeMap::new(), - peers, + peers: HashSet::from_iter([peer_id]), to_be_downloaded: start_epoch, processing_target: start_epoch, optimistic_start: None, @@ -173,7 +167,7 @@ impl SyncingChain { /// Peers currently syncing this chain. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] pub fn peers(&self) -> impl Iterator + '_ { - self.peers.keys().cloned() + self.peers.iter().cloned() } /// Progress in epochs made by the chain @@ -196,29 +190,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] - pub fn remove_peer( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { - if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches. - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(true)? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: id, - }); - } - self.retry_batch_download(network, id)?; - } else { - debug!(%peer_id, batch = ?id, "Batch not found while removing peer") - } - } - } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + self.peers.remove(peer_id); if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) @@ -270,11 +243,9 @@ impl SyncingChain { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - self.peers - .get_mut(peer_id) - .map(|active_requests| active_requests.remove(&batch_id)); - let received = batch.download_completed(blocks)?; + // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 + let received = batch.download_completed(blocks, *peer_id)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -476,7 +447,7 @@ impl SyncingChain { } }; - let peer = batch.current_peer().cloned().ok_or_else(|| { + let peer = batch.processing_peer().cloned().ok_or_else(|| { RemoveChain::WrongBatchState(format!( "Processing target is in wrong state: {:?}", batch.state(), @@ -582,7 +553,7 @@ impl SyncingChain { "Batch failed to download. Dropping chain scoring peers" ); - for (peer, _) in self.peers.drain() { + for peer in self.peers.drain() { network.report_peer(peer, *penalty, "faulty_chain"); } Err(RemoveChain::ChainFailed { @@ -595,7 +566,7 @@ impl SyncingChain { BatchProcessResult::NonFaultyFailure => { batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; // Simply redownload the batch. - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } } } @@ -616,7 +587,7 @@ impl SyncingChain { debug!(%epoch, reason, "Rejected optimistic batch left for future use"); // this batch is now treated as any other batch, and re-requested for future use if redownload { - return self.retry_batch_download(network, epoch); + return self.send_batch(network, epoch); } } else { debug!(%epoch, reason, "Rejected optimistic batch"); @@ -696,12 +667,7 @@ impl SyncingChain { } } } - BatchState::Downloading(peer, ..) => { - // remove this batch from the peer's active requests - if let Some(active_batches) = self.peers.get_mut(peer) { - active_batches.remove(&id); - } - } + BatchState::Downloading(..) => {} BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { crit!("batch indicates inconsistent chain state while advancing chain") } @@ -790,10 +756,10 @@ impl SyncingChain { self.processing_target = self.start_epoch; for id in redownload_queue { - self.retry_batch_download(network, id)?; + self.send_batch(network, id)?; } // finally, re-request the failed batch. - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } pub fn stop_syncing(&mut self) { @@ -849,13 +815,8 @@ impl SyncingChain { network: &mut SyncNetworkContext, peer_id: PeerId, ) -> ProcessingResult { - // add the peer without overwriting its active requests - if self.peers.entry(peer_id).or_default().is_empty() { - // Either new or not, this peer is idle, try to request more batches - self.request_batches(network) - } else { - Ok(KeepChain) - } + self.peers.insert(peer_id); + self.request_batches(network) } /// An RPC error has occurred. @@ -896,16 +857,15 @@ impl SyncingChain { %request_id, "Batch download error" ); - if let Some(active_requests) = self.peers.get_mut(peer_id) { - active_requests.remove(&batch_id); - } - if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(Some(*peer_id))? + { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }); } - self.retry_batch_download(network, batch_id) + self.send_batch(network, batch_id) } else { debug!( batch_epoch = %batch_id, @@ -919,66 +879,42 @@ impl SyncingChain { } } - /// Sends and registers the request of a batch awaiting download. - #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] - pub fn retry_batch_download( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - ) -> ProcessingResult { - let Some(batch) = self.batches.get_mut(&batch_id) else { - return Ok(KeepChain); - }; - - // Find a peer to request the batch - let failed_peers = batch.failed_peers(); - - let new_peer = self - .peers - .iter() - .map(|(peer, requests)| { - ( - failed_peers.contains(peer), - requests.len(), - rand::thread_rng().gen::(), - *peer, - ) - }) - // Sort peers prioritizing unrelated peers with less active requests. - .min() - .map(|(_, _, _, peer)| peer); - - if let Some(peer) = new_peer { - self.send_batch(network, batch_id, peer) - } else { - // If we are here the chain has no more peers - Err(RemoveChain::EmptyPeerPool) - } - } - /// Requests the batch assigned to the given id from a given peer. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] pub fn send_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer: PeerId, ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_peers(); + + // TODO(das): we should request only from peers that are part of this SyncingChain. + // However, then we hit the NoPeer error frequently which causes the batch to fail and + // the SyncingChain to be dropped. We need to handle this case more gracefully. + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + match network.block_components_by_range_request( - peer, batch_type, request, RangeRequestId::RangeSync { chain_id: self.id, batch_id, }, + &synced_peers, + &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer, request_id)?; + batch.start_downloading(request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -988,41 +924,34 @@ impl SyncingChain { } else { debug!(epoch = %batch_id, %batch, %batch_state, "Requesting batch"); } - // register the batch for this peer - return self - .peers - .get_mut(&peer) - .map(|requests| { - requests.insert(batch_id); - Ok(KeepChain) - }) - .unwrap_or_else(|| { - Err(RemoveChain::WrongChainState(format!( - "Sending batch to a peer that is not in the chain: {}", - peer - ))) - }); + return Ok(KeepChain); } - Err(e) => { - // NOTE: under normal conditions this shouldn't happen but we handle it anyway - warn!(%batch_id, error = %e, %batch, "Could not send batch request"); - // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant - self.peers - .get_mut(&peer) - .map(|request| request.remove(&batch_id)); - match batch.download_failed(true)? { - BatchOperationOutcome::Failed { blacklist } => { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: batch_id, - }) - } - BatchOperationOutcome::Continue => { - return self.retry_batch_download(network, batch_id) + Err(e) => match e { + // TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For + // sync to work properly it must be okay to have "stalled" batches in + // AwaitingDownload state. Currently it will error with invalid state if + // that happens. Sync manager must periodicatlly prune stalled batches like + // we do for lookup sync. Then we can deprecate the redundant + // `good_peers_on_sampling_subnets` checks. + e + @ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request"); + // register the failed download and check if the batch can be retried + batch.start_downloading(1)?; // fake request_id = 1 is not relevant + match batch.download_failed(None)? { + BatchOperationOutcome::Failed { blacklist } => { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) + } + BatchOperationOutcome::Continue => { + return self.send_batch(network, batch_id) + } } } - } + }, } } @@ -1061,21 +990,6 @@ impl SyncingChain { // find the next pending batch and request it from the peer - // randomize the peers for load balancing - let mut rng = rand::thread_rng(); - let mut idle_peers = self - .peers - .iter() - .filter_map(|(peer, requests)| { - if requests.is_empty() { - Some(*peer) - } else { - None - } - }) - .collect::>(); - idle_peers.shuffle(&mut rng); - // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { @@ -1085,26 +999,25 @@ impl SyncingChain { } if let Entry::Vacant(entry) = self.batches.entry(epoch) { - if let Some(peer) = idle_peers.pop() { - let batch_type = network.batch_type(epoch); - let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); - entry.insert(optimistic_batch); - self.send_batch(network, epoch, peer)?; - } + let batch_type = network.batch_type(epoch); + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); + entry.insert(optimistic_batch); + self.send_batch(network, epoch)?; } return Ok(KeepChain); } - while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch(network) { - // send the batch - self.send_batch(network, batch_id, peer)?; - } else { - // No more batches, simply stop - return Ok(KeepChain); - } + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; } + // No more batches, simply stop Ok(KeepChain) } @@ -1149,6 +1062,7 @@ impl SyncingChain { { return None; } + // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. @@ -1177,19 +1091,20 @@ impl SyncingChain { return None; } - let batch_id = self.to_be_downloaded; + // If no batch needs a retry, attempt to send the batch of the next epoch to download + let next_batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch - match self.batches.entry(batch_id) { + match self.batches.entry(next_batch_id) { Entry::Occupied(_) => { // this batch doesn't need downloading, let this same function decide the next batch self.to_be_downloaded += EPOCHS_PER_BATCH; self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(batch_id); - entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type)); + let batch_type = network.batch_type(next_batch_id); + entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type)); self.to_be_downloaded += EPOCHS_PER_BATCH; - Some(batch_id) + Some(next_batch_id) } } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c87418b87b..1ec1440991 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -317,9 +317,8 @@ where skip_all )] fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in self - .chains - .call_all(|chain| chain.remove_peer(peer_id, network)) + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.remove_peer(peer_id)) { self.on_chain_removed( removed_chain, diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 84c95b2a4c..565d7bc9f8 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -357,10 +357,13 @@ impl TestRig { pub fn new_connected_peer(&mut self) -> PeerId { let key = self.determinstic_key(); - self.network_globals + let peer_id = self + .network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec, key) + .__add_connected_peer_testing_only(false, &self.harness.spec, key); + self.log(&format!("Added new peer for testing {peer_id:?}")); + peer_id } pub fn new_connected_supernode_peer(&mut self) -> PeerId {