mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-29 20:27:14 +00:00
Report RPC Errors to the application on peer disconnections (#5680)
* Report RPC Errors to the application on peer disconnections Co-authored-by: Age Manning <Age@AgeManning.com> * Expect RPCError::Disconnect to fail ongoing requests * Drop lookups after peer disconnect and not awaiting events * Allow RPCError disconnect through network service * Update beacon_node/lighthouse_network/src/service/mod.rs Co-authored-by: Age Manning <Age@AgeManning.com> * Merge branch 'unstable' into rpc-error-on-disconnect
This commit is contained in:
@@ -352,6 +352,31 @@ where
|
||||
!matches!(self.state, HandlerState::Deactivated)
|
||||
}
|
||||
|
||||
// NOTE: This function gets polled to completion upon a connection close.
|
||||
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
|
||||
// Inform the network behaviour of any failed requests
|
||||
|
||||
while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
|
||||
let outbound_info = self
|
||||
.outbound_substreams
|
||||
.remove(&substream_id)
|
||||
.expect("The value must exist for a key");
|
||||
// If the state of the connection is closing, we do not need to report this case to
|
||||
// the behaviour, as the connection has just closed non-gracefully
|
||||
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Register this request as an RPC Error
|
||||
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
|
||||
error: RPCError::Disconnected,
|
||||
proto: outbound_info.proto,
|
||||
id: outbound_info.req_id,
|
||||
})));
|
||||
}
|
||||
Poll::Ready(None)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
|
||||
@@ -972,6 +972,12 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
|
||||
.goodbye_peer(peer_id, reason, source);
|
||||
}
|
||||
|
||||
/// Hard (ungraceful) disconnect for testing purposes only
|
||||
/// Use goodbye_peer for disconnections, do not use this function.
|
||||
pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) {
|
||||
let _ = self.swarm.disconnect_peer_id(peer_id);
|
||||
}
|
||||
|
||||
/// Returns an iterator over all enr entries in the DHT.
|
||||
pub fn enr_entries(&self) -> Vec<Enr> {
|
||||
self.discovery().table_entries_enr()
|
||||
@@ -1373,12 +1379,18 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
|
||||
let peer_id = event.peer_id;
|
||||
|
||||
if !self.peer_manager().is_connected(&peer_id) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Ignoring rpc message of disconnecting peer";
|
||||
event
|
||||
);
|
||||
return None;
|
||||
// Sync expects a RPCError::Disconnected to drop associated lookups with this peer.
|
||||
// Silencing this event breaks the API contract with RPC where every request ends with
|
||||
// - A stream termination event, or
|
||||
// - An RPCError event
|
||||
if !matches!(event.event, HandlerEvent::Err(HandlerErr::Outbound { .. })) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Ignoring rpc message of disconnecting peer";
|
||||
event
|
||||
);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
let handler_id = event.conn_id;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
mod common;
|
||||
|
||||
use common::Protocol;
|
||||
use lighthouse_network::rpc::methods::*;
|
||||
use lighthouse_network::rpc::{methods::*, RPCError};
|
||||
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
|
||||
use slog::{debug, warn, Level};
|
||||
use ssz::Encode;
|
||||
@@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disconnect_triggers_rpc_error() {
|
||||
// set up the logging. The level and enabled logging or not
|
||||
let log_level = Level::Debug;
|
||||
let enable_logging = false;
|
||||
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
let spec = E::default_spec();
|
||||
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
// get sender/receiver
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
&log,
|
||||
ForkName::Base,
|
||||
&spec,
|
||||
Protocol::Tcp,
|
||||
)
|
||||
.await;
|
||||
|
||||
// BlocksByRoot Request
|
||||
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
|
||||
// Must have at least one root for the request to create a stream
|
||||
vec![Hash256::from_low_u64_be(0)],
|
||||
&spec,
|
||||
));
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
// Send a STATUS message
|
||||
debug!(log, "Sending RPC");
|
||||
sender.send_request(peer_id, 42, rpc_request.clone());
|
||||
}
|
||||
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
|
||||
RPCError::Disconnected => return,
|
||||
other => panic!("received unexpected error {:?}", other),
|
||||
},
|
||||
other => {
|
||||
warn!(log, "Ignoring other event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
|
||||
// messages
|
||||
let mut sending_peer = None;
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
// this future either drives the sending/receiving or times out allowing messages to be
|
||||
// sent in the timeout
|
||||
match futures::future::select(
|
||||
Box::pin(receiver.next_event()),
|
||||
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
futures::future::Either::Left((ev, _)) => match ev {
|
||||
NetworkEvent::RequestReceived { peer_id, .. } => {
|
||||
sending_peer = Some(peer_id);
|
||||
}
|
||||
other => {
|
||||
warn!(log, "Ignoring other event {:?}", other);
|
||||
}
|
||||
},
|
||||
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
||||
}
|
||||
|
||||
// if we need to send messages send them here. This will happen after a delay
|
||||
if let Some(peer_id) = sending_peer.take() {
|
||||
warn!(log, "Receiver got request, disconnecting peer");
|
||||
receiver.__hard_disconnect_testing_only(peer_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(30)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
|
||||
/// Goodbye message.
|
||||
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
|
||||
|
||||
@@ -307,11 +307,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
/// A peer has disconnected.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
||||
pub fn peer_disconnected(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
network: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), BackFillError> {
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
|
||||
if matches!(
|
||||
self.state(),
|
||||
BackFillState::Failed | BackFillState::NotRequired
|
||||
@@ -319,37 +315,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
|
||||
// fail the batches
|
||||
for id in batch_ids {
|
||||
if let Some(batch) = self.batches.get_mut(&id) {
|
||||
match batch.download_failed(false) {
|
||||
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
|
||||
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
|
||||
}
|
||||
Ok(BatchOperationOutcome::Continue) => {}
|
||||
Err(e) => {
|
||||
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
|
||||
}
|
||||
}
|
||||
// If we have run out of peers in which to retry this batch, the backfill state
|
||||
// transitions to a paused state.
|
||||
// We still need to reset the state for all the affected batches, so we should not
|
||||
// short circuit early
|
||||
if self.retry_batch_download(network, id).is_err() {
|
||||
debug!(
|
||||
self.log,
|
||||
"Batch could not be retried";
|
||||
"batch_id" => id,
|
||||
"error" => "no synced peers"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(self.log, "Batch not found while removing peer";
|
||||
"peer" => %peer_id, "batch" => id)
|
||||
}
|
||||
}
|
||||
}
|
||||
self.active_requests.remove(peer_id);
|
||||
|
||||
// Remove the peer from the participation list
|
||||
self.participating_peers.remove(peer_id);
|
||||
|
||||
@@ -382,16 +382,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
/* Error responses */
|
||||
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
|
||||
/* Check disconnection for single lookups */
|
||||
self.single_block_lookups.retain(|_, req| {
|
||||
let should_drop_lookup =
|
||||
req.should_drop_lookup_on_disconnected_peer(peer_id );
|
||||
|
||||
if should_drop_lookup {
|
||||
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?req.block_root());
|
||||
self.single_block_lookups.retain(|_, lookup| {
|
||||
if lookup.remove_peer(peer_id) {
|
||||
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
|
||||
!should_drop_lookup
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -186,21 +186,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
&& self.blob_request_state.state.is_processed()
|
||||
}
|
||||
|
||||
/// Checks both the block and blob request states to see if the peer is disconnected.
|
||||
///
|
||||
/// Returns true if the lookup should be dropped.
|
||||
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
|
||||
self.block_request_state.state.remove_peer(peer_id);
|
||||
self.blob_request_state.state.remove_peer(peer_id);
|
||||
|
||||
if self.all_available_peers().count() == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Note: if the peer disconnected happens to have an on-going request associated with this
|
||||
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
|
||||
// now.
|
||||
false
|
||||
/// Remove peer from available peers. Return true if there are no more available peers and all
|
||||
/// requests are not expecting any future event (AwaitingDownload).
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
|
||||
self.block_request_state.state.remove_peer(peer_id)
|
||||
&& self.blob_request_state.state.remove_peer(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,9 +455,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
self.available_peers.insert(*peer_id)
|
||||
}
|
||||
|
||||
/// If a peer disconnects, this request could be failed. If so, an error is returned
|
||||
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
|
||||
/// Remove peer from available peers. Return true if there are no more available peers and the
|
||||
/// request is not expecting any future event (AwaitingDownload).
|
||||
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
|
||||
self.available_peers.remove(disconnected_peer_id);
|
||||
self.available_peers.is_empty() && self.is_awaiting_download()
|
||||
}
|
||||
|
||||
pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
|
||||
@@ -450,8 +450,25 @@ impl TestRig {
|
||||
})
|
||||
}
|
||||
|
||||
fn peer_disconnected(&mut self, peer_id: PeerId) {
|
||||
self.send_sync_message(SyncMessage::Disconnect(peer_id));
|
||||
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
|
||||
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
|
||||
|
||||
// Return RPCErrors for all active requests of peer
|
||||
self.drain_network_rx();
|
||||
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: RequestId::Sync(id),
|
||||
..
|
||||
} if *peer_id == disconnected_peer_id => Some(*id),
|
||||
_ => None,
|
||||
}) {
|
||||
self.send_sync_message(SyncMessage::RpcError {
|
||||
peer_id: disconnected_peer_id,
|
||||
request_id,
|
||||
error: RPCError::Disconnected,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_network_rx(&mut self) {
|
||||
|
||||
@@ -373,9 +373,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.range_sync.peer_disconnect(&mut self.network, peer_id);
|
||||
self.block_lookups.peer_disconnected(peer_id);
|
||||
// Regardless of the outcome, we update the sync status.
|
||||
let _ = self
|
||||
.backfill_sync
|
||||
.peer_disconnected(peer_id, &mut self.network);
|
||||
let _ = self.backfill_sync.peer_disconnected(peer_id);
|
||||
self.update_sync_state();
|
||||
}
|
||||
|
||||
|
||||
@@ -174,30 +174,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
/// Removes a peer from the chain.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
pub fn remove_peer(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
network: &mut SyncNetworkContext<T>,
|
||||
) -> ProcessingResult {
|
||||
if let Some(batch_ids) = self.peers.remove(peer_id) {
|
||||
// fail the batches
|
||||
for id in batch_ids {
|
||||
if let Some(batch) = self.batches.get_mut(&id) {
|
||||
if let BatchOperationOutcome::Failed { blacklist } =
|
||||
batch.download_failed(true)?
|
||||
{
|
||||
return Err(RemoveChain::ChainFailed {
|
||||
blacklist,
|
||||
failing_batch: id,
|
||||
});
|
||||
}
|
||||
self.retry_batch_download(network, id)?;
|
||||
} else {
|
||||
debug!(self.log, "Batch not found while removing peer";
|
||||
"peer" => %peer_id, "batch" => id)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
|
||||
self.peers.remove(peer_id);
|
||||
|
||||
if self.peers.is_empty() {
|
||||
Err(RemoveChain::EmptyPeerPool)
|
||||
|
||||
@@ -278,9 +278,8 @@ where
|
||||
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
|
||||
/// retries. In this case, we need to remove the chain.
|
||||
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
|
||||
for (removed_chain, sync_type, remove_reason) in self
|
||||
.chains
|
||||
.call_all(|chain| chain.remove_peer(peer_id, network))
|
||||
for (removed_chain, sync_type, remove_reason) in
|
||||
self.chains.call_all(|chain| chain.remove_peer(peer_id))
|
||||
{
|
||||
self.on_chain_removed(
|
||||
removed_chain,
|
||||
|
||||
Reference in New Issue
Block a user