Write range sync tests in external event-driven form (#6618)

* Write range sync tests in external event-driven form

* Fix remaining test

* Drop unused generics

* Merge branch 'unstable' into range-sync-tests

* Add reference to test author

* Use async await

* Fix failing test. Not sure how it was passing before without an EL.
This commit is contained in:
Lion - dapplion
2024-12-16 13:44:10 +08:00
committed by GitHub
parent 75d90795be
commit 1c5be34def
8 changed files with 328 additions and 512 deletions

View File

@@ -1,13 +0,0 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use types::Hash256;
/// Trait that helps maintain RangeSync's implementation split from the BeaconChain
pub trait BlockStorage {
fn is_block_known(&self, block_root: &Hash256) -> bool;
}
impl<T: BeaconChainTypes> BlockStorage for BeaconChain<T> {
fn is_block_known(&self, block_root: &Hash256) -> bool {
self.block_is_known_to_fork_choice(block_root)
}
}

View File

@@ -3,12 +3,11 @@
//! Each chain type is stored in it's own map. A variety of helper functions are given along with
//! this struct to simplify the logic of the other layers of sync.
use super::block_storage::BlockStorage;
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::BeaconChainTypes;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
@@ -37,10 +36,13 @@ pub enum RangeSyncState {
Idle,
}
pub type SyncChainStatus =
Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str>;
/// A collection of finalized and head chains currently being processed.
pub struct ChainCollection<T: BeaconChainTypes, C> {
pub struct ChainCollection<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Arc<C>,
beacon_chain: Arc<BeaconChain<T>>,
/// The set of finalized chains being synced.
finalized_chains: FnvHashMap<ChainId, SyncingChain<T>>,
/// The set of head chains being synced.
@@ -51,8 +53,8 @@ pub struct ChainCollection<T: BeaconChainTypes, C> {
log: slog::Logger,
}
impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: slog::Logger) -> Self {
ChainCollection {
beacon_chain,
finalized_chains: FnvHashMap::default(),
@@ -213,9 +215,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
}
}
pub fn state(
&self,
) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str> {
pub fn state(&self) -> SyncChainStatus {
match self.state {
RangeSyncState::Finalized(ref syncing_id) => {
let chain = self
@@ -409,7 +409,8 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
let log_ref = &self.log;
let is_outdated = |target_slot: &Slot, target_root: &Hash256| {
target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root)
target_slot <= &local_finalized_slot
|| beacon_chain.block_is_known_to_fork_choice(target_root)
};
// Retain only head peers that remain relevant

View File

@@ -2,7 +2,6 @@
//! peers.
mod batch;
mod block_storage;
mod chain;
mod chain_collection;
mod range;
@@ -13,5 +12,7 @@ pub use batch::{
ByRangeRequestType,
};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
#[cfg(test)]
pub use chain_collection::SyncChainStatus;
pub use range::RangeSync;
pub use sync_type::RangeSyncType;

View File

@@ -39,9 +39,8 @@
//! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially
//! and further batches are requested as current blocks are being processed.
use super::block_storage::BlockStorage;
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::chain_collection::{ChainCollection, SyncChainStatus};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::status::ToStatusMessage;
@@ -56,7 +55,7 @@ use lru_cache::LRUTimeCache;
use slog::{crit, debug, trace, warn};
use std::collections::HashMap;
use std::sync::Arc;
use types::{Epoch, EthSpec, Hash256, Slot};
use types::{Epoch, EthSpec, Hash256};
/// For how long we store failed finalized chains to prevent retries.
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
@@ -64,27 +63,26 @@ const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
/// holds the current state of the long range sync.
pub struct RangeSync<T: BeaconChainTypes, C = BeaconChain<T>> {
pub struct RangeSync<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Arc<C>,
beacon_chain: Arc<BeaconChain<T>>,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T, C>,
chains: ChainCollection<T>,
/// Chains that have failed and are stored to prevent being retried.
failed_chains: LRUTimeCache<Hash256>,
/// The syncing logger.
log: slog::Logger,
}
impl<T: BeaconChainTypes, C> RangeSync<T, C>
impl<T: BeaconChainTypes> RangeSync<T>
where
C: BlockStorage + ToStatusMessage,
T: BeaconChainTypes,
{
pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: slog::Logger) -> Self {
RangeSync {
beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain, log.clone()),
@@ -96,9 +94,7 @@ where
}
}
pub fn state(
&self,
) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str> {
pub fn state(&self) -> SyncChainStatus {
self.chains.state()
}
@@ -382,465 +378,3 @@ where
}
}
}
#[cfg(test)]
mod tests {
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::SyncMessage;
use crate::NetworkMessage;
use super::*;
use crate::sync::network_context::{BlockOrBlob, RangeRequestId};
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock;
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_chain::EngineState;
use beacon_processor::WorkEvent as BeaconWorkEvent;
use lighthouse_network::service::api_types::SyncRequestId;
use lighthouse_network::{
rpc::StatusMessage, service::api_types::AppRequestId, NetworkConfig, NetworkGlobals,
};
use slog::{o, Drain};
use slot_clock::TestingSlotClock;
use std::collections::HashSet;
use store::MemoryStore;
use tokio::sync::mpsc;
use types::{FixedBytesExtended, ForkName, MinimalEthSpec as E};
#[derive(Debug)]
struct FakeStorage {
known_blocks: RwLock<HashSet<Hash256>>,
status: RwLock<StatusMessage>,
}
impl Default for FakeStorage {
fn default() -> Self {
FakeStorage {
known_blocks: RwLock::new(HashSet::new()),
status: RwLock::new(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: 0usize.into(),
head_root: Hash256::zero(),
head_slot: 0usize.into(),
}),
}
}
}
impl FakeStorage {
fn remember_block(&self, block_root: Hash256) {
self.known_blocks.write().insert(block_root);
}
#[allow(dead_code)]
fn forget_block(&self, block_root: &Hash256) {
self.known_blocks.write().remove(block_root);
}
}
impl BlockStorage for FakeStorage {
fn is_block_known(&self, block_root: &store::Hash256) -> bool {
self.known_blocks.read().contains(block_root)
}
}
impl ToStatusMessage for FakeStorage {
fn status_message(&self) -> StatusMessage {
self.status.read().clone()
}
}
type TestBeaconChainType =
Witness<TestingSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}
#[allow(unused)]
struct TestRig {
log: slog::Logger,
/// To check what does sync send to the beacon processor.
beacon_processor_rx: mpsc::Receiver<BeaconWorkEvent<E>>,
/// To set up different scenarios where sync is told about known/unknown blocks.
chain: Arc<FakeStorage>,
/// Needed by range to handle communication with the network.
cx: SyncNetworkContext<TestBeaconChainType>,
/// To check what the network receives from Range.
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
/// To modify what the network declares about various global variables, in particular about
/// the sync state of a peer.
globals: Arc<NetworkGlobals<E>>,
}
impl RangeSync<TestBeaconChainType, FakeStorage> {
fn assert_state(&self, expected_state: RangeSyncType) {
assert_eq!(
self.state()
.expect("State is ok")
.expect("Range is syncing")
.0,
expected_state
)
}
#[allow(dead_code)]
fn assert_not_syncing(&self) {
assert!(
self.state().expect("State is ok").is_none(),
"Range should not be syncing."
);
}
}
impl TestRig {
fn local_info(&self) -> SyncInfo {
let StatusMessage {
fork_digest: _,
finalized_root,
finalized_epoch,
head_root,
head_slot,
} = self.chain.status.read().clone();
SyncInfo {
head_slot,
head_root,
finalized_epoch,
finalized_root,
}
}
/// Reads an BlocksByRange request to a given peer from the network receiver channel.
#[track_caller]
fn grab_request(
&mut self,
expected_peer: &PeerId,
fork_name: ForkName,
) -> (AppRequestId, Option<AppRequestId>) {
let block_req_id = if let Ok(NetworkMessage::SendRequest {
peer_id,
request: _,
request_id,
}) = self.network_rx.try_recv()
{
assert_eq!(&peer_id, expected_peer);
request_id
} else {
panic!("Should have sent a batch request to the peer")
};
let blob_req_id = if fork_name.deneb_enabled() {
if let Ok(NetworkMessage::SendRequest {
peer_id,
request: _,
request_id,
}) = self.network_rx.try_recv()
{
assert_eq!(&peer_id, expected_peer);
Some(request_id)
} else {
panic!("Should have sent a batch request to the peer")
}
} else {
None
};
(block_req_id, blob_req_id)
}
fn complete_range_block_and_blobs_response(
&mut self,
block_req: AppRequestId,
blob_req_opt: Option<AppRequestId>,
) -> (ChainId, BatchId, Id) {
if blob_req_opt.is_some() {
match block_req {
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
let _ = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None));
let response = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Blob(None))
.unwrap();
let (chain_id, batch_id) =
TestRig::unwrap_range_request_id(response.sender_id);
(chain_id, batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
} else {
match block_req {
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
let response = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None))
.unwrap();
let (chain_id, batch_id) =
TestRig::unwrap_range_request_id(response.sender_id);
(chain_id, batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
}
}
fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) {
if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id {
(chain_id, batch_id)
} else {
panic!("expected RangeSync request: {:?}", sender_id)
}
}
/// Produce a head peer
fn head_peer(
&self,
) -> (
PeerId,
SyncInfo, /* Local info */
SyncInfo, /* Remote info */
) {
let local_info = self.local_info();
// Get a peer with an advanced head
let head_root = Hash256::random();
let head_slot = local_info.head_slot + 1;
let remote_info = SyncInfo {
head_root,
head_slot,
..local_info
};
let peer_id = PeerId::random();
(peer_id, local_info, remote_info)
}
fn finalized_peer(
&self,
) -> (
PeerId,
SyncInfo, /* Local info */
SyncInfo, /* Remote info */
) {
let local_info = self.local_info();
let finalized_root = Hash256::random();
let finalized_epoch = local_info.finalized_epoch + 2;
let head_slot = finalized_epoch.start_slot(E::slots_per_epoch());
let head_root = Hash256::random();
let remote_info = SyncInfo {
finalized_epoch,
finalized_root,
head_slot,
head_root,
};
let peer_id = PeerId::random();
(peer_id, local_info, remote_info)
}
#[track_caller]
fn expect_empty_processor(&mut self) {
match self.beacon_processor_rx.try_recv() {
Ok(work) => {
panic!(
"Expected empty processor. Instead got {}",
work.work_type_str()
);
}
Err(e) => match e {
mpsc::error::TryRecvError::Empty => {}
mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"),
},
}
}
#[track_caller]
fn expect_chain_segment(&mut self) {
match self.beacon_processor_rx.try_recv() {
Ok(work) => {
assert_eq!(work.work_type(), beacon_processor::WorkType::ChainSegment);
}
other => panic!("Expected chain segment process, found {:?}", other),
}
}
}
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, FakeStorage>) {
let log = build_log(slog::Level::Trace, log_enabled);
// Initialise a new beacon chain
let harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
.default_spec()
.logger(log.clone())
.deterministic_keypairs(1)
.fresh_ephemeral_store()
.build();
let chain = harness.chain;
let fake_store = Arc::new(FakeStorage::default());
let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new(
fake_store.clone(),
log.new(o!("component" => "range")),
);
let (network_tx, network_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
let network_config = Arc::new(NetworkConfig::default());
let globals = Arc::new(NetworkGlobals::new_test_globals(
Vec::new(),
&log,
network_config,
chain.spec.clone(),
));
let (network_beacon_processor, beacon_processor_rx) =
NetworkBeaconProcessor::null_for_testing(
globals.clone(),
sync_tx,
chain.clone(),
harness.runtime.task_executor.clone(),
log.clone(),
);
let cx = SyncNetworkContext::new(
network_tx,
Arc::new(network_beacon_processor),
chain,
log.new(o!("component" => "network_context")),
);
let test_rig = TestRig {
log,
beacon_processor_rx,
chain: fake_store,
cx,
network_rx,
globals,
};
(test_rig, range_sync)
}
#[test]
fn head_chain_removed_while_finalized_syncing() {
// NOTE: this is a regression test.
let (mut rig, mut range) = range(false);
// Get a peer with an advanced head
let (head_peer, local_info, remote_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, head_peer, remote_info);
range.assert_state(RangeSyncType::Head);
let fork = rig
.cx
.chain
.spec
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
// Sync should have requested a batch, grab the request.
let _ = rig.grab_request(&head_peer, fork);
// Now get a peer with an advanced finalized epoch.
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
range.assert_state(RangeSyncType::Finalized);
// Sync should have requested a batch, grab the request
let _ = rig.grab_request(&finalized_peer, fork);
// Fail the head chain by disconnecting the peer.
range.remove_peer(&mut rig.cx, &head_peer);
range.assert_state(RangeSyncType::Finalized);
}
#[test]
fn state_update_while_purging() {
// NOTE: this is a regression test.
let (mut rig, mut range) = range(true);
// Get a peer with an advanced head
let (head_peer, local_info, head_info) = rig.head_peer();
let head_peer_root = head_info.head_root;
range.add_peer(&mut rig.cx, local_info, head_peer, head_info);
range.assert_state(RangeSyncType::Head);
let fork = rig
.cx
.chain
.spec
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
// Sync should have requested a batch, grab the request.
let _ = rig.grab_request(&head_peer, fork);
// Now get a peer with an advanced finalized epoch.
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
let finalized_peer_root = remote_info.finalized_root;
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
range.assert_state(RangeSyncType::Finalized);
// Sync should have requested a batch, grab the request
let _ = rig.grab_request(&finalized_peer, fork);
// Now the chain knows both chains target roots.
rig.chain.remember_block(head_peer_root);
rig.chain.remember_block(finalized_peer_root);
// Add an additional peer to the second chain to make range update it's status
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
}
#[test]
fn pause_and_resume_on_ee_offline() {
let (mut rig, mut range) = range(true);
let fork = rig
.cx
.chain
.spec
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
// add some peers
let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork);
let (chain1, batch1, id1) =
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
// make the ee offline
rig.cx.update_execution_engine_state(EngineState::Offline);
// send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]);
// the beacon processor shouldn't have received any work
rig.expect_empty_processor();
// while the ee is offline, more peers might arrive. Add a new finalized peer.
let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork);
let (chain2, batch2, id2) =
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
// send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]);
// the beacon processor shouldn't have received any work
rig.expect_empty_processor();
// make the beacon processor available again.
rig.cx.update_execution_engine_state(EngineState::Online);
// now resume range, we should have two processing requests in the beacon processor.
range.resume(&mut rig.cx);
rig.expect_chain_segment();
rig.expect_chain_segment();
}
}

View File

@@ -1,10 +1,9 @@
//! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and
//! of a remote.
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::SyncInfo;
use super::block_storage::BlockStorage;
/// The type of Range sync that should be done relative to our current state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RangeSyncType {
@@ -17,8 +16,8 @@ pub enum RangeSyncType {
impl RangeSyncType {
/// Determines the type of sync given our local `PeerSyncInfo` and the remote's
/// `PeerSyncInfo`.
pub fn new<C: BlockStorage>(
chain: &C,
pub fn new<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
local_info: &SyncInfo,
remote_info: &SyncInfo,
) -> RangeSyncType {
@@ -29,7 +28,7 @@ impl RangeSyncType {
// not seen the finalized hash before.
if remote_info.finalized_epoch > local_info.finalized_epoch
&& !chain.is_block_known(&remote_info.finalized_root)
&& !chain.block_is_known_to_fork_choice(&remote_info.finalized_root)
{
RangeSyncType::Finalized
} else {