mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-17 10:48:28 +00:00
Merge branch 'unstable' into gloas-lookup-sync-fixes
This commit is contained in:
@@ -793,12 +793,39 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
);
|
||||
}
|
||||
|
||||
/// Updates the connection state. MUST ONLY BE USED IN TESTS.
|
||||
pub fn __add_connected_peer_testing_only(
|
||||
/// Adds a connected peer to the PeerDB and sets the custody subnets.
|
||||
/// WARNING: This updates the connection state. MUST ONLY BE USED IN TESTS.
|
||||
pub fn __add_connected_peer_with_custody_subnets(
|
||||
&mut self,
|
||||
supernode: bool,
|
||||
spec: &ChainSpec,
|
||||
enr_key: CombinedKey,
|
||||
) -> PeerId {
|
||||
let peer_id = self.__add_connected_peer(supernode, enr_key, spec);
|
||||
|
||||
let subnets = if supernode {
|
||||
(0..spec.data_column_sidecar_subnet_count)
|
||||
.map(|subnet_id| subnet_id.into())
|
||||
.collect()
|
||||
} else {
|
||||
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
|
||||
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
|
||||
.expect("should compute custody subnets")
|
||||
};
|
||||
|
||||
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
|
||||
peer_info.set_custody_subnets(subnets);
|
||||
|
||||
peer_id
|
||||
}
|
||||
|
||||
/// Adds a connected peer to the PeerDB and updates the connection state.
|
||||
/// MUST ONLY BE USED IN TESTS.
|
||||
pub fn __add_connected_peer(
|
||||
&mut self,
|
||||
supernode: bool,
|
||||
enr_key: CombinedKey,
|
||||
spec: &ChainSpec,
|
||||
) -> PeerId {
|
||||
let mut enr = Enr::builder().build(&enr_key).unwrap();
|
||||
let peer_id = enr.peer_id();
|
||||
@@ -835,24 +862,21 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
},
|
||||
);
|
||||
|
||||
if supernode {
|
||||
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
|
||||
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
|
||||
.map(|subnet_id| subnet_id.into())
|
||||
.collect();
|
||||
peer_info.set_custody_subnets(all_subnets);
|
||||
} else {
|
||||
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
|
||||
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
|
||||
let subnets =
|
||||
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
|
||||
.expect("should compute custody subnets");
|
||||
peer_info.set_custody_subnets(subnets);
|
||||
}
|
||||
|
||||
peer_id
|
||||
}
|
||||
|
||||
/// MUST ONLY BE USED IN TESTS.
|
||||
pub fn __set_custody_subnets(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
custody_subnets: HashSet<DataColumnSubnetId>,
|
||||
) -> Result<(), String> {
|
||||
self.peers
|
||||
.get_mut(peer_id)
|
||||
.map(|info| info.set_custody_subnets(custody_subnets))
|
||||
.ok_or_else(|| "Cannot set custody subnets, peer not found".to_string())
|
||||
}
|
||||
|
||||
/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
|
||||
/// variables are in sync with libp2p.
|
||||
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
|
||||
|
||||
@@ -1255,7 +1255,7 @@ mod tests {
|
||||
let peer_id = network_globals
|
||||
.peers
|
||||
.write()
|
||||
.__add_connected_peer_testing_only(
|
||||
.__add_connected_peer_with_custody_subnets(
|
||||
true,
|
||||
&beacon_chain.spec,
|
||||
k256::ecdsa::SigningKey::random(&mut rng).into(),
|
||||
|
||||
@@ -31,13 +31,15 @@ use lighthouse_network::{
|
||||
types::SyncState,
|
||||
};
|
||||
use slot_clock::{SlotClock, TestingSlotClock};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::info;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, ForkContext, ForkName, Hash256,
|
||||
MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSubnetId,
|
||||
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
const D: Duration = Duration::new(0, 0);
|
||||
@@ -1535,7 +1537,7 @@ impl TestRig {
|
||||
.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.__add_connected_peer_testing_only(false, &self.harness.spec, key);
|
||||
.__add_connected_peer_with_custody_subnets(false, &self.harness.spec, key);
|
||||
|
||||
// Assumes custody subnet count == column count
|
||||
let custody_subnets = self
|
||||
@@ -1566,13 +1568,38 @@ impl TestRig {
|
||||
.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.__add_connected_peer_testing_only(true, &self.harness.spec, key);
|
||||
.__add_connected_peer_with_custody_subnets(true, &self.harness.spec, key);
|
||||
self.log(&format!(
|
||||
"Added new peer for testing {peer_id:?}, custody: supernode"
|
||||
));
|
||||
peer_id
|
||||
}
|
||||
|
||||
/// Add a connected supernode peer, but without setting the peers' custody subnet.
|
||||
/// This is to simulate the real behaviour where metadata is only received some time after
|
||||
/// a connection is established.
|
||||
pub fn new_connected_supernode_peer_no_metadata_custody_subnet(&mut self) -> PeerId {
|
||||
let key = self.determinstic_key();
|
||||
self.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.__add_connected_peer(true, key, &self.harness.spec)
|
||||
}
|
||||
|
||||
/// Update the peer's custody subnet in PeerDB and send a `UpdatedPeerCgc` message to sync.
|
||||
pub fn send_peer_cgc_update_to_sync(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
subnets: HashSet<DataColumnSubnetId>,
|
||||
) {
|
||||
self.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.__set_custody_subnets(peer_id, subnets)
|
||||
.unwrap();
|
||||
self.send_sync_message(SyncMessage::UpdatedPeerCgc(*peer_id))
|
||||
}
|
||||
|
||||
fn determinstic_key(&mut self) -> CombinedKey {
|
||||
k256::ecdsa::SigningKey::random(&mut self.rng_08).into()
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::sync::range_sync::RangeSyncType;
|
||||
use lighthouse_network::rpc::RPCError;
|
||||
use lighthouse_network::rpc::methods::StatusMessageV2;
|
||||
use lighthouse_network::{PeerId, SyncInfo};
|
||||
use std::collections::HashSet;
|
||||
use types::{Epoch, EthSpec, Hash256, MinimalEthSpec as E, Slot};
|
||||
|
||||
/// MinimalEthSpec has 8 slots per epoch
|
||||
@@ -57,7 +58,7 @@ impl TestRig {
|
||||
finalized_root: Hash256::random(),
|
||||
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
|
||||
head_root: Hash256::random(),
|
||||
earliest_available_slot: None,
|
||||
earliest_available_slot: Some(Slot::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -510,3 +511,76 @@ async fn not_enough_custody_peers_then_peers_arrive() {
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_range_sync_completed();
|
||||
}
|
||||
|
||||
/// This is a regression test for the following race condition scenario:
|
||||
/// 1. A node is connected to 3 supernode peers: peer 1 is synced, & peer 2 and 3 are advanced.
|
||||
/// 2. No metadata has been received yet (i.e. no custody info), so the node cannot start data
|
||||
/// column range sync yet.
|
||||
/// 3. Now peer 1 sends the CGC via metadata response, we now have one peer on all custody subnets,
|
||||
/// BUT not on the finalized syncing chain.
|
||||
/// 4. The node tries to `send_batch` but fails repeatedly with `NoPeers`, as there's no peer
|
||||
/// that is able to serve columns for the advanced epochs. The chain is removed after 5 failed attempts.
|
||||
/// 5. Now peer 2 & 3 send CGC updates, BUT because there's no syncing chain, nothing happens -
|
||||
/// sync is stuck until finding new peers.
|
||||
///
|
||||
/// The expected behaviour in this scenario should be:
|
||||
/// 4. not finding suitable peers, chain is kept and batch remains in AwaitingDownload
|
||||
/// 5. finalized sync should resume as soon as CGC updates are received from peer 2 or 3.
|
||||
#[tokio::test]
|
||||
async fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() {
|
||||
let mut r = TestRig::default();
|
||||
if !r.fork_name.fulu_enabled() || r.skip_range_sync_under_gloas() {
|
||||
return;
|
||||
}
|
||||
|
||||
// GIVEN: the node is connected to 3 supernode peers:
|
||||
let advanced_epochs: usize = 2;
|
||||
let sync_epochs = advanced_epochs + 3;
|
||||
let sync_slots = sync_epochs * SLOTS_PER_EPOCH - 1;
|
||||
r.build_chain(sync_slots).await;
|
||||
r.harness.set_current_slot(Slot::new(sync_slots as u64 + 1));
|
||||
|
||||
// Peer 1 is synced (same finalized epoch), but its earliest available slot means it
|
||||
// cannot serve the batches needed for this sync.
|
||||
let peer_1 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
|
||||
let mut remote_info = r.local_info().clone();
|
||||
remote_info.earliest_available_slot = Some(Slot::new(sync_slots as u64));
|
||||
r.send_sync_message(SyncMessage::AddPeer(peer_1, remote_info));
|
||||
|
||||
// Peer 2 is advanced (local finalized epoch + 2)
|
||||
let peer_2 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
|
||||
let remote_info = r.finalized_remote_info_advanced_by((advanced_epochs as u64).into());
|
||||
r.send_sync_message(SyncMessage::AddPeer(peer_2, remote_info.clone()));
|
||||
// We expect a finalized chain to be created with peer 2, but no requests sent out yet due to missing custody info.
|
||||
r.assert_state(RangeSyncType::Finalized);
|
||||
r.assert_empty_network();
|
||||
|
||||
// Peer 3 is connected and advanced
|
||||
let peer_3 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
|
||||
r.send_sync_message(SyncMessage::AddPeer(peer_3, remote_info));
|
||||
// We are still in finalized sync state (now with peer 3 added)
|
||||
r.assert_state(RangeSyncType::Finalized);
|
||||
|
||||
for (i, p) in [peer_1, peer_2, peer_3].iter().enumerate() {
|
||||
let peer_idx = i + 1;
|
||||
r.log(&format!("Peer {peer_idx}: {p:?}"));
|
||||
}
|
||||
|
||||
// WHEN: peer 1 sends its CGC via metadata response
|
||||
let all_custody_subnets = (0..r.harness.spec.data_column_sidecar_subnet_count)
|
||||
.map(|i| i.into())
|
||||
.collect::<HashSet<_>>();
|
||||
r.send_peer_cgc_update_to_sync(&peer_1, all_custody_subnets.clone());
|
||||
|
||||
// We still don't have any peers on the syncing chain with custody columns (only peer 1)
|
||||
// The node won't send the batch and will remain in the finalized sync state (this was failing before!)
|
||||
r.assert_state(RangeSyncType::Finalized);
|
||||
r.assert_empty_network();
|
||||
|
||||
// Now we receive peer 2 & 3's CGC updates, the node will resume syncing from these two peers
|
||||
r.send_peer_cgc_update_to_sync(&peer_2, all_custody_subnets.clone());
|
||||
r.send_peer_cgc_update_to_sync(&peer_3, all_custody_subnets);
|
||||
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_range_sync_completed();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user