mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 08:52:54 +00:00
Event-based block lookup tests (#5534)
* WIP * Initial working version of new sync tests. * Remove sync traits and fix lints. * Reduce internal method visibility and make test method instead. Remove extra beacon chain harness instance created in tests. * Improve `SyncTester` api. * Fix lint. * Test example * Lookup tests using rig * Tests should interface with events only * lint * Skip deneb test pre-deneb * Add more assertions * Remove logging changes * Address @jimmygchen comments * Merge branch 'unstable' of https://github.com/sigp/lighthouse into bn-p2p-tests * remove unused assertions * fix lint
This commit is contained in:
@@ -2498,6 +2498,7 @@ pub fn build_log(level: slog::Level, enabled: bool) -> Logger {
|
|||||||
|
|
||||||
pub enum NumBlobs {
|
pub enum NumBlobs {
|
||||||
Random,
|
Random,
|
||||||
|
Number(usize),
|
||||||
None,
|
None,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2518,6 +2519,7 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
|
|||||||
let payload: &mut FullPayloadDeneb<E> = &mut message.body.execution_payload;
|
let payload: &mut FullPayloadDeneb<E> = &mut message.body.execution_payload;
|
||||||
let num_blobs = match num_blobs {
|
let num_blobs = match num_blobs {
|
||||||
NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()),
|
NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()),
|
||||||
|
NumBlobs::Number(n) => n,
|
||||||
NumBlobs::None => 0,
|
NumBlobs::None => 0,
|
||||||
};
|
};
|
||||||
let (bundle, transactions) =
|
let (bundle, transactions) =
|
||||||
@@ -2537,6 +2539,7 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
|
|||||||
let payload: &mut FullPayloadElectra<E> = &mut message.body.execution_payload;
|
let payload: &mut FullPayloadElectra<E> = &mut message.body.execution_payload;
|
||||||
let num_blobs = match num_blobs {
|
let num_blobs = match num_blobs {
|
||||||
NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()),
|
NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()),
|
||||||
|
NumBlobs::Number(n) => n,
|
||||||
NumBlobs::None => 0,
|
NumBlobs::None => 0,
|
||||||
};
|
};
|
||||||
let (bundle, transactions) =
|
let (bundle, transactions) =
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::discovery::CombinedKey;
|
||||||
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
|
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
|
||||||
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
|
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
@@ -671,6 +672,20 @@ impl<E: EthSpec> PeerDB<E> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates the connection state. MUST ONLY BE USED IN TESTS.
|
||||||
|
pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option<BanOperation> {
|
||||||
|
let enr_key = CombinedKey::generate_secp256k1();
|
||||||
|
let enr = Enr::builder().build(&enr_key).unwrap();
|
||||||
|
self.update_connection_state(
|
||||||
|
peer_id,
|
||||||
|
NewConnectionState::Connected {
|
||||||
|
enr: Some(enr),
|
||||||
|
seen_address: Multiaddr::empty(),
|
||||||
|
direction: ConnectionDirection::Outgoing,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
|
/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
|
||||||
/// variables are in sync with libp2p.
|
/// variables are in sync with libp2p.
|
||||||
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
|
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
|
||||||
|
|||||||
@@ -58,3 +58,4 @@ environment = { workspace = true }
|
|||||||
disable-backfill = []
|
disable-backfill = []
|
||||||
fork_from_env = ["beacon_chain/fork_from_env"]
|
fork_from_env = ["beacon_chain/fork_from_env"]
|
||||||
portable = ["beacon_chain/portable"]
|
portable = ["beacon_chain/portable"]
|
||||||
|
test_logger = []
|
||||||
|
|||||||
@@ -3,16 +3,13 @@ use crate::{
|
|||||||
sync::{manager::BlockProcessType, SyncMessage},
|
sync::{manager::BlockProcessType, SyncMessage},
|
||||||
};
|
};
|
||||||
use beacon_chain::block_verification_types::RpcBlock;
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
use beacon_chain::{
|
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
|
||||||
builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain,
|
|
||||||
};
|
|
||||||
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
|
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
|
||||||
use beacon_processor::{
|
use beacon_processor::{
|
||||||
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend,
|
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend,
|
||||||
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
|
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
|
||||||
WorkEvent as BeaconWorkEvent,
|
WorkEvent as BeaconWorkEvent,
|
||||||
};
|
};
|
||||||
use environment::null_logger;
|
|
||||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
||||||
use lighthouse_network::{
|
use lighthouse_network::{
|
||||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
|
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
|
||||||
@@ -24,7 +21,6 @@ use std::path::PathBuf;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::MemoryStore;
|
use store::MemoryStore;
|
||||||
use task_executor::test_utils::TestRuntime;
|
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::mpsc::{self, error::TrySendError};
|
use tokio::sync::mpsc::{self, error::TrySendError};
|
||||||
use types::*;
|
use types::*;
|
||||||
@@ -667,6 +663,9 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
|
|||||||
// processor (but not much else).
|
// processor (but not much else).
|
||||||
pub fn null_for_testing(
|
pub fn null_for_testing(
|
||||||
network_globals: Arc<NetworkGlobals<E>>,
|
network_globals: Arc<NetworkGlobals<E>>,
|
||||||
|
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
|
||||||
|
executor: TaskExecutor,
|
||||||
|
log: Logger,
|
||||||
) -> (Self, mpsc::Receiver<BeaconWorkEvent<E>>) {
|
) -> (Self, mpsc::Receiver<BeaconWorkEvent<E>>) {
|
||||||
let BeaconProcessorChannels {
|
let BeaconProcessorChannels {
|
||||||
beacon_processor_tx,
|
beacon_processor_tx,
|
||||||
@@ -677,27 +676,17 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
|
|||||||
|
|
||||||
let (network_tx, _network_rx) = mpsc::unbounded_channel();
|
let (network_tx, _network_rx) = mpsc::unbounded_channel();
|
||||||
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
|
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
|
||||||
let log = null_logger().unwrap();
|
|
||||||
let harness: BeaconChainHarness<TestBeaconChainType<E>> =
|
|
||||||
BeaconChainHarness::builder(E::default())
|
|
||||||
.spec(E::default_spec())
|
|
||||||
.deterministic_keypairs(8)
|
|
||||||
.logger(log.clone())
|
|
||||||
.fresh_ephemeral_store()
|
|
||||||
.mock_execution_layer()
|
|
||||||
.build();
|
|
||||||
let runtime = TestRuntime::default();
|
|
||||||
|
|
||||||
let network_beacon_processor = Self {
|
let network_beacon_processor = Self {
|
||||||
beacon_processor_send: beacon_processor_tx,
|
beacon_processor_send: beacon_processor_tx,
|
||||||
duplicate_cache: DuplicateCache::default(),
|
duplicate_cache: DuplicateCache::default(),
|
||||||
chain: harness.chain,
|
chain,
|
||||||
network_tx,
|
network_tx,
|
||||||
sync_tx,
|
sync_tx,
|
||||||
reprocess_tx: work_reprocessing_tx,
|
reprocess_tx: work_reprocessing_tx,
|
||||||
network_globals,
|
network_globals,
|
||||||
invalid_block_storage: InvalidBlockStorage::Disabled,
|
invalid_block_storage: InvalidBlockStorage::Disabled,
|
||||||
executor: runtime.task_executor.clone(),
|
executor,
|
||||||
log,
|
log,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -76,6 +76,24 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn active_single_lookups(&self) -> Vec<Id> {
|
||||||
|
self.single_block_lookups.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn active_parent_lookups(&self) -> Vec<Hash256> {
|
||||||
|
self.parent_lookups
|
||||||
|
.iter()
|
||||||
|
.map(|r| r.chain_hash())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool {
|
||||||
|
self.failed_chains.contains(chain_hash)
|
||||||
|
}
|
||||||
|
|
||||||
/* Lookup requests */
|
/* Lookup requests */
|
||||||
|
|
||||||
/// Creates a lookup for the block with the given `block_root` and immediately triggers it.
|
/// Creates a lookup for the block with the given `block_root` and immediately triggers it.
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -233,24 +233,13 @@ pub fn spawn<T: BeaconChainTypes>(
|
|||||||
);
|
);
|
||||||
|
|
||||||
// create an instance of the SyncManager
|
// create an instance of the SyncManager
|
||||||
let network_globals = beacon_processor.network_globals.clone();
|
let mut sync_manager = SyncManager::new(
|
||||||
let mut sync_manager = SyncManager {
|
beacon_chain,
|
||||||
chain: beacon_chain.clone(),
|
network_send,
|
||||||
input_channel: sync_recv,
|
beacon_processor,
|
||||||
network: SyncNetworkContext::new(
|
sync_recv,
|
||||||
network_send,
|
log.clone(),
|
||||||
beacon_processor.clone(),
|
);
|
||||||
beacon_chain.clone(),
|
|
||||||
log.clone(),
|
|
||||||
),
|
|
||||||
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
|
|
||||||
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
|
|
||||||
block_lookups: BlockLookups::new(
|
|
||||||
beacon_chain.data_availability_checker.clone(),
|
|
||||||
log.clone(),
|
|
||||||
),
|
|
||||||
log: log.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// spawn the sync manager thread
|
// spawn the sync manager thread
|
||||||
debug!(log, "Sync Manager started");
|
debug!(log, "Sync Manager started");
|
||||||
@@ -258,6 +247,48 @@ pub fn spawn<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> SyncManager<T> {
|
impl<T: BeaconChainTypes> SyncManager<T> {
|
||||||
|
pub(crate) fn new(
|
||||||
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
|
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
|
beacon_processor: Arc<NetworkBeaconProcessor<T>>,
|
||||||
|
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
|
||||||
|
log: slog::Logger,
|
||||||
|
) -> Self {
|
||||||
|
let network_globals = beacon_processor.network_globals.clone();
|
||||||
|
Self {
|
||||||
|
chain: beacon_chain.clone(),
|
||||||
|
input_channel: sync_recv,
|
||||||
|
network: SyncNetworkContext::new(
|
||||||
|
network_send,
|
||||||
|
beacon_processor.clone(),
|
||||||
|
beacon_chain.clone(),
|
||||||
|
log.clone(),
|
||||||
|
),
|
||||||
|
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
|
||||||
|
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
|
||||||
|
block_lookups: BlockLookups::new(
|
||||||
|
beacon_chain.data_availability_checker.clone(),
|
||||||
|
log.clone(),
|
||||||
|
),
|
||||||
|
log: log.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn active_single_lookups(&self) -> Vec<Id> {
|
||||||
|
self.block_lookups.active_single_lookups()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn active_parent_lookups(&self) -> Vec<Hash256> {
|
||||||
|
self.block_lookups.active_parent_lookups()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool {
|
||||||
|
self.block_lookups.failed_chains_contains(chain_hash)
|
||||||
|
}
|
||||||
|
|
||||||
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
|
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
|
||||||
self.network.network_globals()
|
self.network.network_globals()
|
||||||
}
|
}
|
||||||
@@ -597,7 +628,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(&mut self, sync_message: SyncMessage<T::EthSpec>) {
|
pub(crate) fn handle_message(&mut self, sync_message: SyncMessage<T::EthSpec>) {
|
||||||
match sync_message {
|
match sync_message {
|
||||||
SyncMessage::AddPeer(peer_id, info) => {
|
SyncMessage::AddPeer(peer_id, info) => {
|
||||||
self.add_peer(peer_id, info);
|
self.add_peer(peer_id, info);
|
||||||
@@ -648,11 +679,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
ChildComponents::new(block_root, None, Some(blobs)),
|
ChildComponents::new(block_root, None, Some(blobs)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
|
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
|
||||||
// If we are not synced, ignore this block.
|
// If we are not synced, ignore this block.
|
||||||
if self.synced_and_connected(&peer_id) {
|
if self.synced_and_connected(&peer_id) {
|
||||||
|
debug!(self.log, "Received sync_message"; "message" => "UnknownBlockHashFromAttestation", "block_root" => %block_root);
|
||||||
self.block_lookups
|
self.block_lookups
|
||||||
.search_block(block_hash, &[peer_id], &mut self.network);
|
.search_block(block_root, &[peer_id], &mut self.network);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SyncMessage::Disconnect(peer_id) => {
|
SyncMessage::Disconnect(peer_id) => {
|
||||||
|
|||||||
@@ -637,7 +637,12 @@ mod tests {
|
|||||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||||
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
|
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
|
||||||
let (network_beacon_processor, beacon_processor_rx) =
|
let (network_beacon_processor, beacon_processor_rx) =
|
||||||
NetworkBeaconProcessor::null_for_testing(globals.clone());
|
NetworkBeaconProcessor::null_for_testing(
|
||||||
|
globals.clone(),
|
||||||
|
chain.clone(),
|
||||||
|
harness.runtime.task_executor.clone(),
|
||||||
|
log.clone(),
|
||||||
|
);
|
||||||
let cx = SyncNetworkContext::new(
|
let cx = SyncNetworkContext::new(
|
||||||
network_tx,
|
network_tx,
|
||||||
Arc::new(network_beacon_processor),
|
Arc::new(network_beacon_processor),
|
||||||
|
|||||||
Reference in New Issue
Block a user