From 2a87016d94fdee29f94400151ecadccf28e9d082 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 20 May 2024 20:27:57 +0200 Subject: [PATCH] Fix lookup disconnect peer (#5815) * Test lookup peer disconnect modes * Fix lookup peer disconnected return early --- .../network/src/sync/block_lookups/mod.rs | 15 ++- .../sync/block_lookups/single_block_lookup.rs | 105 +++++------------- .../network/src/sync/block_lookups/tests.rs | 24 +++- 3 files changed, 60 insertions(+), 84 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 79e95e4c8c..0e89eb956c 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -191,7 +191,7 @@ impl BlockLookups { .iter() .find(|(_, l)| l.block_root() == block_to_drop) { - for &peer_id in lookup.all_used_peers() { + for &peer_id in lookup.all_peers() { cx.report_peer( peer_id, PeerAction::LowToleranceError, @@ -387,8 +387,15 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId) { self.single_block_lookups.retain(|_, lookup| { - if lookup.remove_peer(peer_id) { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root()); + lookup.remove_peer(peer_id); + + // Note: this condition should be removed in the future. It's not strictly necessary to drop a + // lookup if there are no peers left. Lookup should only be dropped if it can not make progress + if lookup.has_no_peers() { + debug!(self.log, + "Dropping single lookup after peer disconnection"; + "block_root" => ?lookup.block_root() + ); false } else { true @@ -545,7 +552,7 @@ impl BlockLookups { lookup.continue_requests(cx) } Action::ParentUnknown { parent_root } => { - let peers = lookup.all_available_peers().cloned().collect::>(); + let peers = lookup.all_peers().copied().collect::>(); lookup.set_awaiting_parent(parent_root); debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 28ac0378b3..f587a98254 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -7,7 +7,6 @@ use crate::sync::network_context::{ }; use beacon_chain::BeaconChainTypes; use derivative::Derivative; -use itertools::Itertools; use rand::seq::IteratorRandom; use std::collections::HashSet; use std::fmt::Debug; @@ -64,6 +63,9 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, + /// Peers that claim to have imported this set of block components + #[derivative(Debug(format_with = "fmt_peer_set_as_len"))] + peers: HashSet, block_root: Hash256, awaiting_parent: Option, created: Instant, @@ -78,8 +80,9 @@ impl SingleBlockLookup { ) -> Self { Self { id, - block_request_state: BlockRequestState::new(requested_block_root, peers), - blob_request_state: BlobRequestState::new(requested_block_root, peers), + block_request_state: BlockRequestState::new(requested_block_root), + blob_request_state: BlobRequestState::new(requested_block_root), + peers: HashSet::from_iter(peers.iter().copied()), block_root: requested_block_root, awaiting_parent, created: Instant::now(), @@ -134,22 +137,9 @@ impl SingleBlockLookup { self.block_root() == block_root } - /// Get all unique used peers across block and blob requests. - pub fn all_used_peers(&self) -> impl Iterator + '_ { - self.block_request_state - .state - .get_used_peers() - .chain(self.blob_request_state.state.get_used_peers()) - .unique() - } - - /// Get all unique available peers across block and blob requests. - pub fn all_available_peers(&self) -> impl Iterator + '_ { - self.block_request_state - .state - .get_available_peers() - .chain(self.blob_request_state.state.get_available_peers()) - .unique() + /// Get all unique peers that claim to have imported this set of block components + pub fn all_peers(&self) -> impl Iterator + '_ { + self.peers.iter() } /// Makes progress on all requests of this lookup. Any error is not recoverable and must result @@ -198,7 +188,7 @@ impl SingleBlockLookup { return Err(LookupRequestError::TooManyAttempts { cannot_process }); } - let Some(peer_id) = request.get_state_mut().use_rand_available_peer() else { + let Some(peer_id) = self.use_rand_available_peer() else { if awaiting_parent { // Allow lookups awaiting for a parent to have zero peers. If when the parent // resolve they still have zero peers the lookup will fail gracefully. @@ -208,6 +198,7 @@ impl SingleBlockLookup { } }; + let request = R::request_state_mut(self); match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { LookupRequestResult::RequestSent(req_id) => { request.get_state_mut().on_download_start(req_id)? @@ -238,9 +229,7 @@ impl SingleBlockLookup { /// Add peer to all request states. The peer must be able to serve this request. /// Returns true if the peer was newly inserted into some request state. pub fn add_peer(&mut self, peer_id: PeerId) -> bool { - let inserted_block = self.block_request_state.state.add_peer(&peer_id); - let inserted_blob = self.blob_request_state.state.add_peer(&peer_id); - inserted_block || inserted_blob + self.peers.insert(peer_id) } /// Returns true if the block has already been downloaded. @@ -252,8 +241,17 @@ impl SingleBlockLookup { /// Remove peer from available peers. Return true if there are no more available peers and all /// requests are not expecting any future event (AwaitingDownload). pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { - self.block_request_state.state.remove_peer(peer_id) - && self.blob_request_state.state.remove_peer(peer_id) + self.peers.remove(peer_id) + } + + /// Returns true if this lookup has zero peers + pub fn has_no_peers(&self) -> bool { + self.peers.is_empty() + } + + /// Selects a random peer from available peers if any + fn use_rand_available_peer(&mut self) -> Option { + self.peers.iter().choose(&mut rand::thread_rng()).copied() } } @@ -267,10 +265,10 @@ pub struct BlobRequestState { } impl BlobRequestState { - pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self { + pub fn new(block_root: Hash256) -> Self { Self { block_root, - state: SingleLookupRequestState::new(peer_source), + state: SingleLookupRequestState::new(), } } } @@ -285,10 +283,10 @@ pub struct BlockRequestState { } impl BlockRequestState { - pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self { + pub fn new(block_root: Hash256) -> Self { Self { requested_block_root: block_root, - state: SingleLookupRequestState::new(peers), + state: SingleLookupRequestState::new(), } } } @@ -318,12 +316,6 @@ pub enum State { pub struct SingleLookupRequestState { /// State of this request. state: State, - /// Peers that should have this block or blob. - #[derivative(Debug(format_with = "fmt_peer_set"))] - available_peers: HashSet, - /// Peers from which we have requested this block. - #[derivative(Debug = "ignore")] - used_peers: HashSet, /// How many times have we attempted to process this block or blob. failed_processing: u8, /// How many times have we attempted to download this block or blob. @@ -331,16 +323,9 @@ pub struct SingleLookupRequestState { } impl SingleLookupRequestState { - pub fn new(peers: &[PeerId]) -> Self { - let mut available_peers = HashSet::default(); - for peer in peers.iter().copied() { - available_peers.insert(peer); - } - + pub fn new() -> Self { Self { state: State::AwaitingDownload, - available_peers, - used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, } @@ -518,38 +503,6 @@ impl SingleLookupRequestState { pub fn more_failed_processing_attempts(&self) -> bool { self.failed_processing >= self.failed_downloading } - - /// Add peer to this request states. The peer must be able to serve this request. - /// Returns true if the peer is newly inserted. - pub fn add_peer(&mut self, peer_id: &PeerId) -> bool { - self.available_peers.insert(*peer_id) - } - - /// Remove peer from available peers. Return true if there are no more available peers and the - /// request is not expecting any future event (AwaitingDownload). - pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool { - self.available_peers.remove(disconnected_peer_id); - self.available_peers.is_empty() && self.is_awaiting_download() - } - - pub fn get_used_peers(&self) -> impl Iterator { - self.used_peers.iter() - } - - pub fn get_available_peers(&self) -> impl Iterator { - self.available_peers.iter() - } - - /// Selects a random peer from available peers if any, inserts it in used peers and returns it. - pub fn use_rand_available_peer(&mut self) -> Option { - let peer_id = self - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied()?; - self.used_peers.insert(peer_id); - Some(peer_id) - } } // Display is used in the BadState assertions above @@ -573,7 +526,7 @@ impl std::fmt::Debug for State { } } -fn fmt_peer_set( +fn fmt_peer_set_as_len( peer_set: &HashSet, f: &mut std::fmt::Formatter, ) -> Result<(), std::fmt::Error> { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 2a59c24d58..b99598fe90 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -526,8 +526,10 @@ impl TestRig { fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); + } - // Return RPCErrors for all active requests of peer + /// Return RPCErrors for all active requests of peer + fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) { self.drain_network_rx(); while let Ok(request_id) = self.pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { @@ -1265,13 +1267,25 @@ fn test_parent_lookup_too_deep() { } #[test] -fn test_parent_lookup_disconnection_no_peers_left() { +fn test_lookup_peer_disconnected_no_peers_left_while_request() { let mut rig = TestRig::test_setup(); let peer_id = rig.new_connected_peer(); let trigger_block = rig.rand_block(); rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); - rig.peer_disconnected(peer_id); + rig.rpc_error_all_active_requests(peer_id); + rig.expect_no_active_lookups(); +} + +#[test] +fn test_lookup_peer_disconnected_no_peers_left_not_while_request() { + let mut rig = TestRig::test_setup(); + let peer_id = rig.new_connected_peer(); + let trigger_block = rig.rand_block(); + rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); + rig.peer_disconnected(peer_id); + // Note: this test case may be removed in the future. It's not strictly necessary to drop a + // lookup if there are no peers left. Lookup should only be dropped if it can not make progress rig.expect_no_active_lookups(); } @@ -1279,13 +1293,15 @@ fn test_parent_lookup_disconnection_no_peers_left() { fn test_lookup_disconnection_peer_left() { let mut rig = TestRig::test_setup(); let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::>(); + let disconnecting_peer = *peer_ids.first().unwrap(); let block_root = Hash256::random(); // lookup should have two peers associated with the same block for peer_id in peer_ids.iter() { rig.trigger_unknown_block_from_attestation(block_root, *peer_id); } // Disconnect the first peer only, which is the one handling the request - rig.peer_disconnected(*peer_ids.first().unwrap()); + rig.peer_disconnected(disconnecting_peer); + rig.rpc_error_all_active_requests(disconnecting_peer); rig.assert_single_lookups_count(1); }