Sync wrong dbg assertion (#2821)

## Issue Addressed

Running a beacon node I triggered a sync debug panic. And so finally the time to create tests for sync arrived. Fortunately, te bug was not in the sync algorithm itself but a wrong assertion

## Proposed Changes

- Split Range's impl from the BeaconChain via a trait. This is needed for testing. The TestingRig/Harness is way bigger than needed and does not provide the modification functionalities that are needed to test sync. I find this simpler, tho some could disagree.
- Add a regression test for sync that fails before the changes.
- Fix the wrong assertion.
This commit is contained in:
Divma
2021-11-19 02:38:25 +00:00
parent e519af9012
commit 31386277c3
7 changed files with 306 additions and 27 deletions

View File

@@ -39,6 +39,7 @@
//! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially
//! and further batches are requested as current blocks are being processed.
use super::block_storage::BlockStorage;
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
@@ -58,24 +59,28 @@ use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
/// holds the current state of the long range sync.
pub struct RangeSync<T: BeaconChainTypes> {
pub struct RangeSync<T: BeaconChainTypes, C = Arc<BeaconChain<T>>> {
/// The beacon chain for processing.
beacon_chain: Arc<BeaconChain<T>>,
beacon_chain: C,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T>,
chains: ChainCollection<T, C>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
/// The syncing logger.
log: slog::Logger,
}
impl<T: BeaconChainTypes> RangeSync<T> {
impl<T: BeaconChainTypes, C> RangeSync<T, C>
where
C: BlockStorage + Clone + ToStatusMessage,
T: BeaconChainTypes,
{
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
beacon_chain: C,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
log: slog::Logger,
) -> Self {
@@ -356,3 +361,257 @@ impl<T: BeaconChainTypes> RangeSync<T> {
);
}
}
#[cfg(test)]
mod tests {
use crate::NetworkMessage;
use super::*;
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock;
use lighthouse_network::rpc::BlocksByRangeRequest;
use lighthouse_network::{libp2p, Request};
use lighthouse_network::{rpc::StatusMessage, NetworkGlobals};
use slog::{o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use store::MemoryStore;
use types::{Hash256, MinimalEthSpec as E};
#[derive(Debug)]
struct FakeStorage {
is_block_known: AtomicBool,
status: RwLock<StatusMessage>,
}
impl Default for FakeStorage {
fn default() -> Self {
FakeStorage {
is_block_known: AtomicBool::new(false),
status: RwLock::new(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: 0usize.into(),
head_root: Hash256::zero(),
head_slot: 0usize.into(),
}),
}
}
}
impl BlockStorage for Arc<FakeStorage> {
fn is_block_known(&self, _block_root: &store::Hash256) -> bool {
self.is_block_known
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl ToStatusMessage for Arc<FakeStorage> {
fn status_message(&self) -> Result<StatusMessage, beacon_chain::BeaconChainError> {
Ok(self.status.read().clone())
}
}
type TestBeaconChainType =
Witness<SystemTimeSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}
#[allow(unused)]
struct TestRig {
log: slog::Logger,
/// To check what does sync send to the beacon processor.
beacon_processor_rx: mpsc::Receiver<BeaconWorkEvent<TestBeaconChainType>>,
/// To set up different scenarios where sync is told about known/unkown blocks.
chain: Arc<FakeStorage>,
/// Needed by range to handle communication with the network.
cx: SyncNetworkContext<E>,
/// To check what the network receives from Range.
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
/// To modify what the network declares about various global variables, in particular about
/// the sync state of a peer.
globals: Arc<NetworkGlobals<E>>,
}
impl RangeSync<TestBeaconChainType, Arc<FakeStorage>> {
fn assert_state(&self, expected_state: RangeSyncType) {
assert_eq!(
self.state()
.expect("State is ok")
.expect("Range is syncing")
.0,
expected_state
)
}
}
impl TestRig {
fn local_info(&self) -> SyncInfo {
let StatusMessage {
fork_digest: _,
finalized_root,
finalized_epoch,
head_root,
head_slot,
} = self.chain.status.read().clone();
SyncInfo {
head_slot,
head_root,
finalized_epoch,
finalized_root,
}
}
/// Reads an BlocksByRange request to a given peer from the network receiver channel.
fn grab_request(
&mut self,
expected_peer: &PeerId,
) -> (lighthouse_network::rpc::RequestId, BlocksByRangeRequest) {
if let Some(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRange(request),
request_id,
}) = self.network_rx.blocking_recv()
{
assert_eq!(&peer_id, expected_peer);
(request_id, request)
} else {
panic!("Should have sent a batch request to the peer")
}
}
/// Produce a head peer
fn head_peer(
&self,
) -> (
PeerId,
SyncInfo, /* Local info */
SyncInfo, /* Remote info */
) {
let local_info = self.local_info();
// Get a peer with an advanced head
let head_root = Hash256::random();
let head_slot = local_info.head_slot + 1;
let remote_info = SyncInfo {
head_root,
head_slot,
..local_info
};
let peer_id = PeerId::random();
(peer_id, local_info, remote_info)
}
fn finalized_peer(
&self,
) -> (
PeerId,
SyncInfo, /* Local info */
SyncInfo, /* Remote info */
) {
let local_info = self.local_info();
let finalized_root = Hash256::random();
let finalized_epoch = local_info.finalized_epoch + 1;
let head_slot = finalized_epoch.start_slot(E::slots_per_epoch());
let head_root = Hash256::random();
let remote_info = SyncInfo {
finalized_epoch,
finalized_root,
head_slot,
head_root,
};
let peer_id = PeerId::random();
(peer_id, local_info, remote_info)
}
}
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, Arc<FakeStorage>>) {
let chain = Arc::new(FakeStorage::default());
let log = build_log(slog::Level::Trace, log_enabled);
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10);
let range_sync = RangeSync::<TestBeaconChainType, Arc<FakeStorage>>::new(
chain.clone(),
beacon_processor_tx,
log.new(o!("component" => "range")),
);
let (network_tx, network_rx) = mpsc::unbounded_channel();
let globals = {
use lighthouse_network::discovery::enr_ext::CombinedKeyExt;
use lighthouse_network::discv5::enr::CombinedKey;
use lighthouse_network::discv5::enr::EnrBuilder;
use lighthouse_network::rpc::methods::{MetaData, MetaDataV2};
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let globals = NetworkGlobals::new(
enr,
9000,
9000,
MetaData::V2(MetaDataV2 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
}),
vec![],
&log,
);
Arc::new(globals)
};
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),
log.new(o!("component" => "network_context")),
);
let test_rig = TestRig {
log,
beacon_processor_rx,
chain,
cx,
network_rx,
globals,
};
(test_rig, range_sync)
}
#[test]
fn head_chain_removed_while_finalized_syncing() {
// NOTE: this is a regression test.
let (mut rig, mut range) = range(true);
// Get a peer with an advanced head
let (head_peer, local_info, remote_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, head_peer, remote_info);
range.assert_state(RangeSyncType::Head);
// Sync should have requested a batch, grab the request.
let _request = rig.grab_request(&head_peer);
// Now get a peer with an advanced finalized epoch.
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
range.assert_state(RangeSyncType::Finalized);
// Sync should have requested a batch, grab the request
let _second_request = rig.grab_request(&finalized_peer);
// Fail the head chain by disconnecting the peer.
range.remove_peer(&mut rig.cx, &head_peer);
range.assert_state(RangeSyncType::Finalized);
}
}