diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 0ccad8d042..87337cafcf 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -247,23 +247,16 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } - /// Returns all the synced peers from the list of allowed peers that claim to have the block + /// Returns all the synced peers from the peer db that claim to have the block /// components for the given epoch based on `status.earliest_available_slot`. /// /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the /// required data. - /// - /// If `allowed_peers` is `Some`, then filters for the epoch only for those peers. - pub fn synced_peers_for_epoch<'a>( - &'a self, - epoch: Epoch, - allowed_peers: Option<&'a HashSet>, - ) -> impl Iterator { + pub fn synced_peers_for_epoch(&self, epoch: Epoch) -> impl Iterator { self.peers .iter() - .filter(move |(peer_id, info)| { - allowed_peers.is_none_or(|allowed| allowed.contains(peer_id)) - && info.is_connected() + .filter(move |(_, info)| { + info.is_connected() && match info.sync_status() { SyncStatus::Synced { info } => { info.has_slot(epoch.end_slot(E::slots_per_epoch())) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index d5a4e9b73a..00597586b8 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -210,7 +210,7 @@ impl BackFillSync { .network_globals .peers .read() - .synced_peers_for_epoch(self.to_be_downloaded, None) + .synced_peers_for_epoch(self.to_be_downloaded) .next() .is_some() // backfill can't progress if we do not have peers in the required subnets post peerdas. @@ -313,7 +313,6 @@ impl BackFillSync { CouplingError::DataColumnPeerFailure { error, faulty_peers, - action, exceeded_retries, } => { debug!(?batch_id, error, "Block components coupling error"); @@ -325,11 +324,8 @@ impl BackFillSync { failed_columns.insert(*column); failed_peers.insert(*peer); } - for peer in failed_peers.iter() { - network.report_peer(*peer, *action, "failed to return columns"); - } - // Only retry if peer failure **and** retries have been exceeded + // Only retry if peer failure **and** retries haven't been exceeded if !*exceeded_retries { return self.retry_partial_batch( network, @@ -888,7 +884,7 @@ impl BackFillSync { .network_globals .peers .read() - .synced_peers_for_epoch(batch_id, None) + .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); @@ -899,6 +895,7 @@ impl BackFillSync { request, RangeRequestId::BackfillSync { batch_id }, &synced_peers, + &synced_peers, // All synced peers have imported up to the finalized slot so they must have their custody columns available &failed_peers, ) { Ok(request_id) => { @@ -964,7 +961,7 @@ impl BackFillSync { .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, None) + .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ffc79c1550..ba89d11225 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -2,7 +2,7 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; use lighthouse_network::{ - PeerAction, PeerId, + PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, }, @@ -63,7 +63,6 @@ pub(crate) enum CouplingError { DataColumnPeerFailure { error: String, faulty_peers: Vec<(ColumnIndex, PeerId)>, - action: PeerAction, exceeded_retries: bool, }, BlobPeerFailure(String), @@ -253,7 +252,6 @@ impl RangeBlockComponentsRequest { if let Err(CouplingError::DataColumnPeerFailure { error: _, faulty_peers, - action: _, exceeded_retries: _, }) = &resp { @@ -377,7 +375,6 @@ impl RangeBlockComponentsRequest { return Err(CouplingError::DataColumnPeerFailure { error: format!("No columns for block {block_root:?} with data"), faulty_peers: responsible_peers, - action: PeerAction::LowToleranceError, exceeded_retries, }); @@ -402,7 +399,6 @@ impl RangeBlockComponentsRequest { return Err(CouplingError::DataColumnPeerFailure { error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), faulty_peers: naughty_peers, - action: PeerAction::LowToleranceError, exceeded_retries }); } @@ -468,7 +464,7 @@ mod tests { NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, }; use lighthouse_network::{ - PeerAction, PeerId, + PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, DataColumnsByRangeRequestId, Id, RangeRequestId, @@ -785,7 +781,6 @@ mod tests { if let Err(super::CouplingError::DataColumnPeerFailure { error, faulty_peers, - action, exceeded_retries, }) = result { @@ -793,7 +788,6 @@ mod tests { assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing assert_eq!(faulty_peers[0].0, 3); // column index 3 assert_eq!(faulty_peers[1].0, 4); // column index 4 - assert!(matches!(action, PeerAction::LowToleranceError)); assert!(!exceeded_retries); // First attempt, should be false } else { panic!("Expected PeerFailure error"); @@ -957,13 +951,11 @@ mod tests { if let Err(super::CouplingError::DataColumnPeerFailure { error: _, faulty_peers, - action, exceeded_retries, }) = result { assert_eq!(faulty_peers.len(), 1); // column 2 missing assert_eq!(faulty_peers[0].0, 2); // column index 2 - assert!(matches!(action, PeerAction::LowToleranceError)); assert!(exceeded_retries); // Should be true after max retries } else { panic!("Expected PeerFailure error with exceeded_retries=true"); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ac2991c147..1d119cb2de 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -533,19 +533,21 @@ impl SyncNetworkContext { batch_type: ByRangeRequestType, request: BlocksByRangeRequest, requester: RangeRequestId, - peers: &HashSet, + block_peers: &HashSet, + column_peers: &HashSet, peers_to_deprioritize: &HashSet, ) -> Result { let range_request_span = debug_span!( parent: None, SPAN_OUTGOING_RANGE_REQUEST, range_req_id = %requester, - peers = peers.len() + block_peers = block_peers.len(), + column_peers = column_peers.len() ); let _guard = range_request_span.clone().entered(); let active_request_count_by_peer = self.active_request_count_by_peer(); - let Some(block_peer) = peers + let Some(block_peer) = block_peers .iter() .map(|peer| { ( @@ -579,7 +581,7 @@ impl SyncNetworkContext { .collect(); Some(self.select_columns_by_range_peers_to_request( &column_indexes, - peers, + column_peers, active_request_count_by_peer, peers_to_deprioritize, )?) @@ -770,7 +772,6 @@ impl SyncNetworkContext { let range_req = entry.get_mut(); if let Some(blocks_result) = range_req.responses(&self.chain.spec) { if let Err(CouplingError::DataColumnPeerFailure { - action: _, error, faulty_peers: _, exceeded_retries, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 31e6594139..c79800bfbe 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -334,6 +334,31 @@ impl BatchInfo { } } + /// Change the batch state from `Self::Downloading` to `Self::AwaitingDownload` without + /// registering a failed attempt. + /// + /// Note: must use this cautiously with some level of retry protection + /// as not registering a failed attempt could lead to requesting in a loop. + #[must_use = "Batch may have failed"] + pub fn downloading_to_awaiting_download( + &mut self, + ) -> Result { + match self.state.poison() { + BatchState::Downloading(_) => { + self.state = BatchState::AwaitingDownload; + Ok(self.outcome()) + } + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) + } + } + } + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 3b816c0922..ab5b8bee5e 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -871,7 +871,6 @@ impl SyncingChain { CouplingError::DataColumnPeerFailure { error, faulty_peers, - action, exceeded_retries, } => { debug!(?batch_id, error, "Block components coupling error"); @@ -883,12 +882,22 @@ impl SyncingChain { failed_columns.insert(*column); failed_peers.insert(*peer); } - for peer in failed_peers.iter() { - network.report_peer(*peer, *action, "failed to return columns"); - } // Retry the failed columns if the column requests haven't exceeded the // max retries. Otherwise, remove treat it as a failed batch below. if !*exceeded_retries { + // Set the batch back to `AwaitingDownload` before retrying. + // This is to ensure that the batch doesn't get stuck in `Downloading` state. + // + // DataColumn retries has a retry limit so calling `downloading_to_awaiting_download` + // is safe. + if let BatchOperationOutcome::Failed { blacklist } = + batch.downloading_to_awaiting_download()? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); + } return self.retry_partial_batch( network, batch_id, @@ -936,7 +945,10 @@ impl SyncingChain { failing_batch: batch_id, }); } - self.send_batch(network, batch_id) + // The errored batch is set to AwaitingDownload above. + // We now just attempt to download all batches stuck in `AwaitingDownload` + // state in the right order. + self.attempt_send_awaiting_download_batches(network, "injecting error") } else { debug!( batch_epoch = %batch_id, @@ -969,7 +981,7 @@ impl SyncingChain { .collect(); debug!( ?awaiting_downloads, - src, "Attempting to send batches awaiting downlaod" + src, "Attempting to send batches awaiting download" ); for batch_id in awaiting_downloads { @@ -998,11 +1010,11 @@ impl SyncingChain { let (request, batch_type) = batch.to_blocks_by_range_request(); let failed_peers = batch.failed_peers(); - let synced_peers = network + let synced_column_peers = network .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, Some(&self.peers)) + .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); @@ -1013,7 +1025,13 @@ impl SyncingChain { chain_id: self.id, batch_id, }, - &synced_peers, + // Request blocks only from peers of this specific chain + &self.peers, + // Request column from all synced peers, even if they are not part of this chain. + // This is to avoid splitting of good column peers across many head chains in a heavy forking + // environment. If the column peers and block peer are on different chains, then we return + // a coupling error and retry only the columns that failed to couple. See `Self::retry_partial_batch`. + &synced_column_peers, &failed_peers, ) { Ok(request_id) => { @@ -1081,7 +1099,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, Some(&self.peers)) + .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); @@ -1093,6 +1111,8 @@ impl SyncingChain { &failed_columns, ) { Ok(_) => { + // inform the batch about the new request + batch.start_downloading(id)?; debug!( ?batch_id, id, "Retried column requests from different peers" @@ -1100,6 +1120,8 @@ impl SyncingChain { return Ok(KeepChain); } Err(e) => { + // No need to explicitly fail the batch since its in `AwaitingDownload` state + // before we attempted to retry. debug!(?batch_id, id, e, "Failed to retry partial batch"); } } @@ -1123,6 +1145,9 @@ impl SyncingChain { ) -> Result { let _guard = self.span.clone().entered(); debug!("Resuming chain"); + // attempt to download any batches stuck in the `AwaitingDownload` state because of + // a lack of peers before. + self.attempt_send_awaiting_download_batches(network, "resume")?; // Request more batches if needed. self.request_batches(network)?; // If there is any batch ready for processing, send it.