From 90ff64381e894728e26447bb866047b6cc90dd15 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 11 Jul 2025 17:02:30 -0700 Subject: [PATCH] Sync peer attribution (#7733) Which issue # does this PR address? Closes #7604 Improvements to range sync including: 1. Contain column requests only to peers that are part of the SyncingChain 2. Attribute the fault to the correct peer and downscore them if they don't return the data columns for the request 3. Improve sync performance by retrying only the failed columns from other peers instead of failing the entire batch 4. Uses the earliest_available_slot to make requests to peers that claim to have the epoch. Note: if no earliest_available_slot info is available, fallback to using previous logic i.e. assume peer has everything backfilled upto WS checkpoint/da boundary Tested this on fusaka-devnet-2 with a full node and supernode and the recovering logic seems to works well. Also tested this a little on mainnet. Need to do more testing and possibly add some unit tests. --- .../src/peer_manager/peerdb.rs | 45 ++++ .../src/peer_manager/peerdb/sync_status.rs | 13 ++ .../src/service/api_types.rs | 7 + .../src/sync/block_sidecar_coupling.rs | 219 ++++++++++++++---- .../network/src/sync/network_context.rs | 115 ++++++++- .../network/src/sync/range_sync/batch.rs | 2 +- .../network/src/sync/range_sync/chain.rs | 82 ++++++- beacon_node/network/src/sync/tests/range.rs | 46 ++-- scripts/tests/genesis-sync-config-fulu.yaml | 7 +- 9 files changed, 437 insertions(+), 99 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index b28807c47e..6559b24724 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -248,6 +248,34 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns all the synced peers from the list of allowed peers that claim to have the block + /// components for the given epoch based on `status.earliest_available_slot`. + /// + /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the + /// required data. + pub fn synced_peers_for_epoch<'a>( + &'a self, + epoch: Epoch, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + allowed_peers.contains(peer_id) + && info.is_connected() + && match info.sync_status() { + SyncStatus::Synced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + _ => false, + } + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and advanced peers. pub fn advanced_peers(&self) -> impl Iterator { self.peers @@ -291,6 +319,23 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns an iterator of all peers that are supposed to be custodying + /// the given subnet id that also belong to `allowed_peers`. + pub fn good_range_sync_custody_subnet_peer<'a>( + &'a self, + subnet: DataColumnSubnetId, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + // The custody_subnets hashset can be populated via enr or metadata + let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); + allowed_peers.contains(peer_id) && info.is_connected() && is_custody_subnet_peer + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs index 5a4fc33994..91e2156a27 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs @@ -28,6 +28,19 @@ pub struct SyncInfo { pub earliest_available_slot: Option, } +impl SyncInfo { + /// Returns true if the provided slot is greater than or equal to the peer's `earliest_available_slot`. + /// + /// If `earliest_available_slot` is None, then we just assume that the peer has the slot. + pub fn has_slot(&self, slot: Slot) -> bool { + if let Some(earliest_available_slot) = self.earliest_available_slot { + slot >= earliest_available_slot + } else { + true + } + } +} + impl std::cmp::PartialEq for SyncStatus { fn eq(&self, other: &Self) -> bool { matches!( diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b36f8cc215..3013596f9f 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,4 +1,5 @@ use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage}; +use libp2p::PeerId; use std::fmt::{Display, Formatter}; use std::sync::Arc; use types::{ @@ -61,6 +62,11 @@ pub struct DataColumnsByRangeRequestId { pub id: Id, /// The Id of the overall By Range request for block components. pub parent_request_id: ComponentsByRangeRequestId, + /// The peer id associated with the request. + /// + /// This is useful to penalize the peer at a later point if it returned data columns that + /// did not match with the verified block. + pub peer: PeerId, } /// Block components by range request for range sync. Includes an ID for downstream consumers to @@ -306,6 +312,7 @@ mod tests { batch_id: Epoch::new(0), }, }, + peer: PeerId::random(), }; assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0418ab4553..4653daa44a 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,15 +1,17 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, +use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + }, + PeerAction, PeerId, }; use std::{collections::HashMap, sync::Arc}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; - pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, @@ -30,24 +32,38 @@ enum RangeBlockDataRequest { DataColumnsByRangeRequestId, ByRangeRequest>, >, + /// The column indices corresponding to the request + column_peers: HashMap>, expected_custody_columns: Vec, }, } +#[derive(Debug)] +pub struct CouplingError { + pub(crate) msg: String, + pub(crate) column_and_peer: Option<(Vec<(ColumnIndex, PeerId)>, PeerAction)>, +} + impl RangeBlockComponentsRequest { + #[allow(clippy::type_complexity)] pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<(Vec, Vec)>, + data_columns: Option<( + Vec<(DataColumnsByRangeRequestId, Vec)>, + Vec, + )>, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) } else if let Some((requests, expected_custody_columns)) = data_columns { + let column_peers: HashMap<_, _> = requests.into_iter().collect(); RangeBlockDataRequest::DataColumns { - requests: requests - .into_iter() - .map(|id| (id, ByRangeRequest::Active(id))) + requests: column_peers + .keys() + .map(|id| (*id, ByRangeRequest::Active(*id))) .collect(), + column_peers, expected_custody_columns, } } else { @@ -60,6 +76,28 @@ impl RangeBlockComponentsRequest { } } + /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed + /// request for some columns. + pub fn reinsert_failed_column_requests( + &mut self, + failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::DataColumns { + requests, + expected_custody_columns: _, + column_peers, + } => { + for (request, columns) in failed_column_requests.into_iter() { + requests.insert(request, ByRangeRequest::Active(request)); + column_peers.insert(request, columns); + } + Ok(()) + } + _ => Err("not a column request".to_string()), + } + } + pub fn add_blocks( &mut self, req_id: BlocksByRangeRequestId, @@ -105,12 +143,15 @@ impl RangeBlockComponentsRequest { } } - pub fn responses(&self, spec: &ChainSpec) -> Option>, String>> { + pub fn responses( + &mut self, + spec: &ChainSpec, + ) -> Option>, CouplingError>> { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; - match &self.block_data_request { + match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) } @@ -127,8 +168,10 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::DataColumns { requests, expected_custody_columns, + column_peers, } => { let mut data_columns = vec![]; + let mut column_to_peer_id: HashMap = HashMap::new(); for req in requests.values() { let Some(data) = req.to_finished() else { return None; @@ -136,12 +179,33 @@ impl RangeBlockComponentsRequest { data_columns.extend(data.clone()) } - Some(Self::responses_with_custody_columns( + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in column_peers { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, + column_to_peer_id, expected_custody_columns, spec, - )) + ); + + if let Err(err) = &resp { + if let Some((peers, _)) = &err.column_and_peer { + for (_, peer) in peers.iter() { + // find the req id associated with the peer and + // delete it from the entries + requests.retain(|&k, _| k.peer != *peer); + } + } + } + + Some(resp) } } } @@ -150,7 +214,7 @@ impl RangeBlockComponentsRequest { blocks: Vec>>, blobs: Vec>>, spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -165,17 +229,26 @@ impl RangeBlockComponentsRequest { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); + blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { + msg: "Missing next blob".to_string(), + column_and_peer: None, + })?); } let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { - return Err("Invalid blob index".to_string()); + return Err(CouplingError { + msg: "Invalid blob index".to_string(), + column_and_peer: None, + }); }; if blob_opt.is_some() { - return Err("Repeat blob index".to_string()); + return Err(CouplingError { + msg: "Repeat blob index".to_string(), + column_and_peer: None, + }); } else { *blob_opt = Some(blob); } @@ -184,13 +257,24 @@ impl RangeBlockComponentsRequest { blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) - .map_err(|_| "Blobs returned exceeds max length".to_string())?; - responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) + .map_err(|_| CouplingError { + msg: "Blobs returned exceeds max length".to_string(), + column_and_peer: None, + })?; + responses.push( + RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { + msg: format!("{e:?}"), + column_and_peer: None, + })?, + ) } // if accumulated sidecars is not empty, throw an error. if blob_iter.next().is_some() { - return Err("Received sidecars that don't pair well".to_string()); + return Err(CouplingError { + msg: "Received sidecars that don't pair well".to_string(), + column_and_peer: None, + }); } Ok(responses) @@ -199,9 +283,10 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, + column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -215,9 +300,10 @@ impl RangeBlockComponentsRequest { .insert(index, column) .is_some() { - return Err(format!( - "Repeated column block_root {block_root:?} index {index}" - )); + return Err(CouplingError { + msg: format!("Repeated column block_root {block_root:?} index {index}"), + column_and_peer: None, + }); } } @@ -235,30 +321,61 @@ impl RangeBlockComponentsRequest { // TODO(das): on the initial version of PeerDAS the beacon chain does not check // rpc custody requirements and dropping this check can allow the block to have // an inconsistent DB. - return Err(format!("No columns for block {block_root:?} with data")); + + // For now, we always assume that the block peer is right. + // This is potentially dangerous as we can get isolated on a chain with a + // malicious block peer. + // TODO: fix this by checking the proposer signature before downloading columns. + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError { + msg: format!("No columns for block {block_root:?} with data"), + column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), + }); }; let mut custody_columns = vec![]; + let mut naughty_peers = vec![]; for index in expects_custody_columns { - let Some(data_column) = data_columns_by_index.remove(index) else { - return Err(format!("No column for block {block_root:?} index {index}")); - }; - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + if let Some(data_column) = data_columns_by_index.remove(index) { + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + } else { + // Penalize the peer for claiming to have the columns but not returning + // them + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError { + msg: format!("Internal error, no request made for column {}", index), + column_and_peer: None, + }); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError { + msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)), + }); } // Assert that there are no columns left if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); - return Err(format!( - "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" - )); + // log the error but don't return an error, we can still progress with extra columns. + tracing::error!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); } RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| format!("{e:?}"))? + .map_err(|e| CouplingError { + msg: format!("{:?}", e), + column_and_peer: None, + })? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) @@ -268,7 +385,9 @@ impl RangeBlockComponentsRequest { // Assert that there are no columns left for other blocks if !data_columns_by_block.is_empty() { let remaining_roots = data_columns_by_block.keys().collect::>(); - return Err(format!("Not all columns consumed: {remaining_roots:?}")); + // log the error but don't return an error, we can still progress with responses. + // this is most likely an internal error with overrequesting or a client bug. + tracing::error!(?remaining_roots, "Not all columns consumed for block"); } Ok(rpc_blocks) @@ -303,9 +422,12 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }, + PeerId, }; use rand::SeedableRng; use std::sync::Arc; @@ -342,10 +464,11 @@ mod tests { DataColumnsByRangeRequestId { id, parent_request_id, + peer: PeerId::random(), } } - fn is_finished(info: &RangeBlockComponentsRequest) -> bool { + fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { let spec = test_spec::(); info.responses(&spec).is_some() } @@ -428,7 +551,7 @@ mod tests { let columns_req_id = expects_custody_columns .iter() .enumerate() - .map(|(i, _)| columns_id(i as Id, components_id)) + .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -442,12 +565,13 @@ mod tests { ) .unwrap(); // Assert response is not finished - assert!(!is_finished(&info)); + assert!(!is_finished(&mut info)); // Send data columns for (i, &column_index) in expects_custody_columns.iter().enumerate() { + let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns( - columns_req_id.get(i).copied().unwrap(), + *req, blocks .iter() .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) @@ -457,7 +581,7 @@ mod tests { if i < expects_custody_columns.len() - 1 { assert!( - !is_finished(&info), + !is_finished(&mut info), "requested should not be finished at loop {i}" ); } @@ -485,7 +609,7 @@ mod tests { let columns_req_id = batched_column_requests .iter() .enumerate() - .map(|(i, _)| columns_id(i as Id, components_id)) + .map(|(i, columns)| (columns_id(i as Id, components_id), columns.clone())) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( @@ -513,12 +637,13 @@ mod tests { ) .unwrap(); // Assert response is not finished - assert!(!is_finished(&info)); + assert!(!is_finished(&mut info)); for (i, column_indices) in batched_column_requests.iter().enumerate() { + let (req, _columns) = columns_req_id.get(i).unwrap(); // Send the set of columns in the same batch request info.add_custody_columns( - columns_req_id.get(i).copied().unwrap(), + *req, blocks .iter() .flat_map(|b| { @@ -532,7 +657,7 @@ mod tests { if i < num_of_data_column_requests - 1 { assert!( - !is_finished(&info), + !is_finished(&mut info), "requested should not be finished at loop {i}" ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2f74bdc733..a62b8f7382 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -14,6 +14,7 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; @@ -81,7 +82,7 @@ pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), - BlockComponentCouplingError(#[allow(dead_code)] String), + BlockComponentCouplingError(CouplingError), } #[derive(Debug, PartialEq, Eq)] @@ -441,6 +442,79 @@ impl SyncNetworkContext { active_request_count_by_peer } + /// Retries only the specified failed columns by requesting them again. + /// + /// Note: This function doesn't retry the whole batch, but retries specific requests within + /// the batch. + pub fn retry_columns_by_range( + &mut self, + request_id: Id, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + request: BlocksByRangeRequest, + failed_columns: &HashSet, + ) -> Result<(), String> { + let Some(requester) = self.components_by_range_requests.keys().find_map(|r| { + if r.id == request_id { + Some(r.requester) + } else { + None + } + }) else { + return Err("request id not present".to_string()); + }; + + let active_request_count_by_peer = self.active_request_count_by_peer(); + + debug!( + ?failed_columns, + "Retrying only failed column requests from other peers" + ); + + // Attempt to find all required custody peers to request the failed columns from + let columns_by_range_peers_to_request = self + .select_columns_by_range_peers_to_request( + failed_columns, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + ) + .map_err(|e| format!("{:?}", e))?; + + // Reuse the id for the request that received partially correct responses + let id = ComponentsByRangeRequestId { + id: request_id, + requester, + }; + + let data_column_requests = columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns, + }, + id, + ) + }) + .collect::, _>>() + .map_err(|e| format!("{:?}", e))?; + + // instead of creating a new `RangeBlockComponentsRequest`, we reinsert + // the new requests created for the failed requests + let Some(range_request) = self.components_by_range_requests.get_mut(&id) else { + return Err( + "retrying custody request for range request that does not exist".to_string(), + ); + }; + + range_request.reinsert_failed_column_requests(data_column_requests)?; + Ok(()) + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -619,20 +693,31 @@ impl SyncNetworkContext { let request = entry.get_mut(); match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { - request - .add_blocks(req_id, blocks) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blocks(req_id, blocks).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { - request - .add_blobs(req_id, blobs) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blobs(req_id, blobs).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { resp.and_then(|(custody_columns, _)| { request .add_custody_columns(req_id, custody_columns) - .map_err(RpcResponseError::BlockComponentCouplingError) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }) } } @@ -641,8 +726,12 @@ impl SyncNetworkContext { return Some(Err(e)); } - if let Some(blocks_result) = entry.get().responses(&self.chain.spec) { - entry.remove(); + if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) { + if blocks_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } // If the request is finished, dequeue everything Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) } else { @@ -1075,10 +1164,12 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRangeRequest, parent_request_id: ComponentsByRangeRequestId, - ) -> Result { + ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { + let requested_columns = request.columns.clone(); let id = DataColumnsByRangeRequestId { id: self.next_id(), parent_request_id, + peer: peer_id, }; self.send_network_msg(NetworkMessage::SendRequest { @@ -1106,7 +1197,7 @@ impl SyncNetworkContext { false, DataColumnsByRangeRequestItems::new(request), ); - Ok(id) + Ok((id, requested_columns)) } pub fn is_execution_engine_online(&self) -> bool { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee82..e31930075a 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -89,6 +89,7 @@ pub enum BatchOperationOutcome { Failed { blacklist: bool }, } +#[derive(Debug)] pub enum BatchProcessingResult { Success, FaultyFailure, @@ -364,7 +365,6 @@ impl BatchInfo { } } - #[must_use = "Batch may have failed"] pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index e3794bd2be..0e9178f0f8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,6 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; @@ -12,7 +13,7 @@ use logging::crit; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; use tracing::{debug, instrument, warn}; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -826,11 +827,37 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { + if let RpcResponseError::BlockComponentCouplingError(CouplingError { + column_and_peer, + msg, + }) = &err + { + debug!(?batch_id, msg, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + if let Some((column_and_peer, action)) = column_and_peer { + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in column_and_peer { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); + } + } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - // TODO(das): removed peer_id matching as the node may request a different peer for data - // columns. if !batch.is_expecting_block(&request_id) { debug!( batch_epoch = %batch_id, @@ -891,7 +918,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, &self.peers) .cloned() .collect::>(); @@ -951,6 +978,50 @@ impl SyncingChain { Ok(KeepChain) } + /// Retries partial column requests within the batch by creating new requests for the failed columns. + #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] + pub fn retry_partial_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + id: Id, + failed_columns: HashSet, + mut failed_peers: HashSet, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + failed_peers.extend(&batch.failed_peers()); + let req = batch.to_blocks_by_range_request().0; + + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + match network.retry_columns_by_range( + id, + &synced_peers, + &failed_peers, + req, + &failed_columns, + ) { + Ok(_) => { + debug!( + ?batch_id, + id, "Retried column requests from different peers" + ); + return Ok(KeepChain); + } + Err(e) => { + debug!(?batch_id, id, e, "Failed to retry partial batch"); + } + } + } + Ok(KeepChain) + } + /// Returns true if this chain is currently syncing. pub fn is_syncing(&self) -> bool { match self.state { @@ -1031,9 +1102,8 @@ impl SyncingChain { .network_globals() .peers .read() - .good_custody_subnet_peer(*subnet_id) + .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) .count(); - peer_count > 0 }); peers_on_all_custody_subnets diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index fa1e057765..7c184d3b39 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -77,7 +77,7 @@ impl TestRig { /// Produce a head peer with an advanced head fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); - self.add_random_peer(SyncInfo { + self.add_supernode_peer(SyncInfo { head_root, head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info @@ -93,7 +93,7 @@ impl TestRig { fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + 2; - self.add_random_peer(SyncInfo { + self.add_supernode_peer(SyncInfo { finalized_epoch, finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), @@ -132,13 +132,13 @@ impl TestRig { } } - fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId { + fn add_fullnode_peer(&mut self, remote_info: SyncInfo) -> PeerId { let peer_id = self.new_connected_peer(); self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); peer_id } - fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId { + fn add_supernode_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals // TODO(fulu): Using supernode peers to ensure we have peer across all column // subnets for syncing. Should add tests connecting to full node peers. @@ -148,17 +148,13 @@ impl TestRig { peer_id } - fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) { - for _ in 0..count { + fn add_fullnode_peers(&mut self, remote_info: SyncInfo, peer_count: usize) { + for _ in 0..peer_count { let peer = self.new_connected_peer(); - self.add_peer(peer, remote_info.clone()); + self.send_sync_message(SyncMessage::AddPeer(peer, remote_info.clone())); } } - fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) { - self.send_sync_message(SyncMessage::AddPeer(peer, remote_info)); - } - fn assert_state(&self, state: RangeSyncType) { assert_eq!( self.sync_manager @@ -562,19 +558,14 @@ const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; fn finalized_sync_enough_global_custody_peers_few_chain_peers() { // Run for all forks let mut r = TestRig::test_setup(); - // This test creates enough global custody peers to satisfy column queries but only adds few - // peers to the chain - r.new_connected_peers_for_peerdas(); let advanced_epochs: u64 = 2; let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); - // Current priorization only sends batches to idle peers, so we need enough peers for each batch - // TODO: Test this with a single peer in the chain, it should still work - r.add_random_peers( - remote_info, - (advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize, - ); + // Generate enough peers and supernodes to cover all custody columns + let peer_count = 100; + r.add_fullnode_peers(remote_info.clone(), peer_count); + r.add_supernode_peer(remote_info); r.assert_state(RangeSyncType::Finalized); let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; @@ -592,9 +583,9 @@ fn finalized_sync_not_enough_custody_peers_on_start() { let advanced_epochs: u64 = 2; let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); - // Unikely that the single peer we added has enough columns for us. Tests are determinstic and + // Unikely that the single peer we added has enough columns for us. Tests are deterministic and // this error should never be hit - r.add_random_peer_not_supernode(remote_info.clone()); + r.add_fullnode_peer(remote_info.clone()); r.assert_state(RangeSyncType::Finalized); // Because we don't have enough peers on all columns we haven't sent any request. @@ -603,14 +594,9 @@ fn finalized_sync_not_enough_custody_peers_on_start() { r.expect_empty_network(); // Generate enough peers and supernodes to cover all custody columns - r.new_connected_peers_for_peerdas(); - // Note: not necessary to add this peers to the chain, as we draw from the global pool - // We still need to add enough peers to trigger batch downloads with idle peers. Same issue as - // the test above. - r.add_random_peers( - remote_info, - (advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize, - ); + let peer_count = 100; + r.add_fullnode_peers(remote_info.clone(), peer_count); + r.add_supernode_peer(remote_info); let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; r.complete_and_process_range_sync_until(last_epoch, filter()); diff --git a/scripts/tests/genesis-sync-config-fulu.yaml b/scripts/tests/genesis-sync-config-fulu.yaml index 91aa4d1ffd..b25ac0a704 100644 --- a/scripts/tests/genesis-sync-config-fulu.yaml +++ b/scripts/tests/genesis-sync-config-fulu.yaml @@ -3,19 +3,20 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-1 + el_image: ethpandaops/geth:fusaka-devnet-2 + supernode: true count: 2 # nodes without validators, used for testing sync. - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-1 + el_image: ethpandaops/geth:fusaka-devnet-2 supernode: true validator_count: 0 - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-1 + el_image: ethpandaops/geth:fusaka-devnet-2 supernode: false validator_count: 0 network_params: