mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-10 04:01:51 +00:00
Add timeouts to canonical head rwlock (#759)
* Add TimeoutRwLock to BeaconChain * Update network crate * Update rest api * Fix beacon chain tests * Fix rest api tests * Set test back to !debug_assertions
This commit is contained in:
@@ -45,9 +45,9 @@ impl From<StatusMessage> for PeerSyncInfo {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> From<&Arc<BeaconChain<T>>> for PeerSyncInfo {
|
||||
fn from(chain: &Arc<BeaconChain<T>>) -> PeerSyncInfo {
|
||||
Self::from(status_message(chain))
|
||||
impl PeerSyncInfo {
|
||||
pub fn from_chain<T: BeaconChainTypes>(chain: &Arc<BeaconChain<T>>) -> Option<PeerSyncInfo> {
|
||||
Some(Self::from(status_message(chain)?))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,8 +119,10 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
///
|
||||
/// Sends a `Status` message to the peer.
|
||||
pub fn on_connect(&mut self, peer_id: PeerId) {
|
||||
self.network
|
||||
.send_rpc_request(peer_id, RPCRequest::Status(status_message(&self.chain)));
|
||||
if let Some(status_message) = status_message(&self.chain) {
|
||||
self.network
|
||||
.send_rpc_request(peer_id, RPCRequest::Status(status_message));
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a `Status` request.
|
||||
@@ -135,12 +137,14 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
// ignore status responses if we are shutting down
|
||||
trace!(self.log, "StatusRequest"; "peer" => format!("{:?}", peer_id));
|
||||
|
||||
// Say status back.
|
||||
self.network.send_rpc_response(
|
||||
peer_id.clone(),
|
||||
request_id,
|
||||
RPCResponse::Status(status_message(&self.chain)),
|
||||
);
|
||||
if let Some(status_message) = status_message(&self.chain) {
|
||||
// Say status back.
|
||||
self.network.send_rpc_response(
|
||||
peer_id.clone(),
|
||||
request_id,
|
||||
RPCResponse::Status(status_message),
|
||||
);
|
||||
}
|
||||
|
||||
self.process_status(peer_id, status);
|
||||
}
|
||||
@@ -158,7 +162,16 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
/// Disconnects the peer if required.
|
||||
fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) {
|
||||
let remote = PeerSyncInfo::from(status);
|
||||
let local = PeerSyncInfo::from(&self.chain);
|
||||
let local = match PeerSyncInfo::from_chain(&self.chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
self.log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
@@ -191,8 +204,11 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
} else if remote.finalized_epoch <= local.finalized_epoch
|
||||
&& remote.finalized_root != Hash256::zero()
|
||||
&& local.finalized_root != Hash256::zero()
|
||||
&& (self.chain.root_at_slot(start_slot(remote.finalized_epoch))
|
||||
!= Some(remote.finalized_root))
|
||||
&& self
|
||||
.chain
|
||||
.root_at_slot(start_slot(remote.finalized_epoch))
|
||||
.map(|root_opt| root_opt != Some(remote.finalized_root))
|
||||
.unwrap_or_else(|_| false)
|
||||
{
|
||||
// The remotes finalized epoch is less than or greater than ours, but the block root is
|
||||
// different to the one in our chain.
|
||||
@@ -321,9 +337,21 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut block_roots = self
|
||||
let forwards_block_root_iter = match self
|
||||
.chain
|
||||
.forwards_iter_block_roots(Slot::from(req.start_slot))
|
||||
{
|
||||
Ok(iter) => iter,
|
||||
Err(e) => {
|
||||
return error!(
|
||||
self.log,
|
||||
"Unable to obtain root iter";
|
||||
"error" => format!("{:?}", e)
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let mut block_roots = forwards_block_root_iter
|
||||
.take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count * req.step)
|
||||
.step_by(req.step as usize)
|
||||
.map(|(root, _slot)| root)
|
||||
@@ -552,16 +580,18 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
}
|
||||
|
||||
/// Build a `StatusMessage` representing the state of the given `beacon_chain`.
|
||||
pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> StatusMessage {
|
||||
let state = &beacon_chain.head().beacon_state;
|
||||
pub(crate) fn status_message<T: BeaconChainTypes>(
|
||||
beacon_chain: &BeaconChain<T>,
|
||||
) -> Option<StatusMessage> {
|
||||
let head_info = beacon_chain.head_info().ok()?;
|
||||
|
||||
StatusMessage {
|
||||
fork_version: state.fork.current_version,
|
||||
finalized_root: state.finalized_checkpoint.root,
|
||||
finalized_epoch: state.finalized_checkpoint.epoch,
|
||||
head_root: beacon_chain.head().beacon_block_root,
|
||||
head_slot: state.slot,
|
||||
}
|
||||
Some(StatusMessage {
|
||||
fork_version: head_info.fork.current_version,
|
||||
finalized_root: head_info.finalized_checkpoint.root,
|
||||
finalized_epoch: head_info.finalized_checkpoint.epoch,
|
||||
head_root: head_info.block_root,
|
||||
head_slot: head_info.slot,
|
||||
})
|
||||
}
|
||||
|
||||
/// Wraps a Network Channel to employ various RPC related network functionality for the message
|
||||
|
||||
@@ -229,7 +229,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
};
|
||||
|
||||
let local = PeerSyncInfo::from(&chain);
|
||||
let local = match PeerSyncInfo::from_chain(&chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
self.log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync,
|
||||
// consider it a fully-sync'd peer.
|
||||
|
||||
@@ -43,7 +43,9 @@ impl SyncNetworkContext {
|
||||
"peer" => format!("{:?}", peer_id)
|
||||
);
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message(&chain)));
|
||||
if let Some(status_message) = status_message(&chain) {
|
||||
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::message_processor::PeerSyncInfo;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::PeerId;
|
||||
use slog::{debug, warn};
|
||||
use slog::{debug, error, warn};
|
||||
use std::sync::Weak;
|
||||
use types::EthSpec;
|
||||
use types::{Hash256, Slot};
|
||||
@@ -103,9 +103,22 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
/// updates the state of the collection.
|
||||
pub fn update_finalized(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) {
|
||||
let local_slot = match self.beacon_chain.upgrade() {
|
||||
Some(chain) => PeerSyncInfo::from(&chain)
|
||||
.finalized_epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
Some(chain) => {
|
||||
let local = match PeerSyncInfo::from_chain(&chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
local
|
||||
.finalized_epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch())
|
||||
}
|
||||
None => {
|
||||
warn!(log, "Beacon chain dropped. Chains not updated");
|
||||
return;
|
||||
@@ -113,7 +126,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
};
|
||||
|
||||
// Remove any outdated finalized chains
|
||||
self.purge_outdated_chains(network);
|
||||
self.purge_outdated_chains(network, log);
|
||||
|
||||
// Check if any chains become the new syncing chain
|
||||
if let Some(index) = self.finalized_syncing_index() {
|
||||
@@ -248,14 +261,23 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
///
|
||||
/// This removes chains with no peers, or chains whose start block slot is less than our current
|
||||
/// finalized block slot.
|
||||
pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) {
|
||||
pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) {
|
||||
// Remove any chains that have no peers
|
||||
self.finalized_chains
|
||||
.retain(|chain| !chain.peer_pool.is_empty());
|
||||
self.head_chains.retain(|chain| !chain.peer_pool.is_empty());
|
||||
|
||||
let local_info = match self.beacon_chain.upgrade() {
|
||||
Some(chain) => PeerSyncInfo::from(&chain),
|
||||
Some(chain) => match PeerSyncInfo::from_chain(&chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::rpc::RequestId;
|
||||
use eth2_libp2p::PeerId;
|
||||
use slog::{debug, trace, warn};
|
||||
use slog::{debug, error, trace, warn};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Weak;
|
||||
use types::{BeaconBlock, EthSpec};
|
||||
@@ -106,7 +106,16 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
// determine if we need to run a sync to the nearest finalized state or simply sync to
|
||||
// its current head
|
||||
let local_info = match self.beacon_chain.upgrade() {
|
||||
Some(chain) => PeerSyncInfo::from(&chain),
|
||||
Some(chain) => match PeerSyncInfo::from_chain(&chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
self.log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
warn!(self.log,
|
||||
"Beacon chain dropped. Peer not considered for sync";
|
||||
@@ -127,7 +136,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
self.remove_peer(network, &peer_id);
|
||||
|
||||
// remove any out-of-date chains
|
||||
self.chains.purge_outdated_chains(network);
|
||||
self.chains.purge_outdated_chains(network, &self.log);
|
||||
|
||||
if remote_finalized_slot > local_info.head_slot {
|
||||
debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id));
|
||||
|
||||
Reference in New Issue
Block a user