mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 11:41:51 +00:00
Remove all batches related to a peer on disconnect (#5969)
* Remove all batches related to a peer on disconnect * Cleanup map entries after disconnect * Allow lookups to continue in case of disconnections * Pretty response types * fmt * Fix lints * Remove lookup if it cannot progress * Fix tests * Remove poll_close on rpc behaviour * Remove redundant test * Fix issue raised by lion * Revert pretty response types * Cleanup * Fix test * Merge remote-tracking branch 'origin/release-v5.2.1' into rpc-error-on-disconnect-revert * Apply suggestions from joao Co-authored-by: João Oliveira <hello@jxs.pt> * Fix log * update request status on no peers found * Do not remove lookup after peer disconnection * Add comments about expected event api * Update single_block_lookup.rs * Update mod.rs * Merge branch 'rpc-error-on-disconnect-revert' into 5969-review * Merge pull request #10 from dapplion/5969-review Add comments about expected event api
This commit is contained in:
@@ -352,37 +352,6 @@ where
|
||||
!matches!(self.state, HandlerState::Deactivated)
|
||||
}
|
||||
|
||||
// NOTE: This function gets polled to completion upon a connection close.
|
||||
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
|
||||
// Inform the network behaviour of any failed requests
|
||||
|
||||
while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
|
||||
let outbound_info = self
|
||||
.outbound_substreams
|
||||
.remove(&substream_id)
|
||||
.expect("The value must exist for a key");
|
||||
// If the state of the connection is closing, we do not need to report this case to
|
||||
// the behaviour, as the connection has just closed non-gracefully
|
||||
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Register this request as an RPC Error
|
||||
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
|
||||
error: RPCError::Disconnected,
|
||||
proto: outbound_info.proto,
|
||||
id: outbound_info.req_id,
|
||||
})));
|
||||
}
|
||||
|
||||
// Also handle any events that are awaiting to be sent to the behaviour
|
||||
if !self.events_out.is_empty() {
|
||||
return Poll::Ready(Some(self.events_out.remove(0)));
|
||||
}
|
||||
|
||||
Poll::Ready(None)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
mod common;
|
||||
|
||||
use common::Protocol;
|
||||
use lighthouse_network::rpc::{methods::*, RPCError};
|
||||
use lighthouse_network::rpc::methods::*;
|
||||
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
|
||||
use slog::{debug, warn, Level};
|
||||
use ssz::Encode;
|
||||
@@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disconnect_triggers_rpc_error() {
|
||||
// set up the logging. The level and enabled logging or not
|
||||
let log_level = Level::Debug;
|
||||
let enable_logging = false;
|
||||
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
let spec = E::default_spec();
|
||||
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
// get sender/receiver
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
&log,
|
||||
ForkName::Base,
|
||||
&spec,
|
||||
Protocol::Tcp,
|
||||
)
|
||||
.await;
|
||||
|
||||
// BlocksByRoot Request
|
||||
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
|
||||
// Must have at least one root for the request to create a stream
|
||||
vec![Hash256::from_low_u64_be(0)],
|
||||
&spec,
|
||||
));
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
// Send a STATUS message
|
||||
debug!(log, "Sending RPC");
|
||||
sender
|
||||
.send_request(peer_id, 42, rpc_request.clone())
|
||||
.unwrap();
|
||||
}
|
||||
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
|
||||
RPCError::Disconnected => return,
|
||||
other => panic!("received unexpected error {:?}", other),
|
||||
},
|
||||
other => {
|
||||
warn!(log, "Ignoring other event {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
|
||||
// messages
|
||||
let mut sending_peer = None;
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
// this future either drives the sending/receiving or times out allowing messages to be
|
||||
// sent in the timeout
|
||||
match futures::future::select(
|
||||
Box::pin(receiver.next_event()),
|
||||
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
futures::future::Either::Left((ev, _)) => match ev {
|
||||
NetworkEvent::RequestReceived { peer_id, .. } => {
|
||||
sending_peer = Some(peer_id);
|
||||
}
|
||||
other => {
|
||||
warn!(log, "Ignoring other event {:?}", other);
|
||||
}
|
||||
},
|
||||
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
||||
}
|
||||
|
||||
// if we need to send messages send them here. This will happen after a delay
|
||||
if let Some(peer_id) = sending_peer.take() {
|
||||
warn!(log, "Receiver got request, disconnecting peer");
|
||||
receiver.__hard_disconnect_testing_only(peer_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(30)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
|
||||
/// Goodbye message.
|
||||
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
|
||||
|
||||
@@ -307,7 +307,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
/// A peer has disconnected.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
|
||||
pub fn peer_disconnected(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
network: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), BackFillError> {
|
||||
if matches!(
|
||||
self.state(),
|
||||
BackFillState::Failed | BackFillState::NotRequired
|
||||
@@ -315,7 +319,37 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.active_requests.remove(peer_id);
|
||||
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
|
||||
// fail the batches.
|
||||
for id in batch_ids {
|
||||
if let Some(batch) = self.batches.get_mut(&id) {
|
||||
match batch.download_failed(false) {
|
||||
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
|
||||
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
|
||||
}
|
||||
Ok(BatchOperationOutcome::Continue) => {}
|
||||
Err(e) => {
|
||||
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
|
||||
}
|
||||
}
|
||||
// If we have run out of peers in which to retry this batch, the backfill state
|
||||
// transitions to a paused state.
|
||||
// We still need to reset the state for all the affected batches, so we should not
|
||||
// short circuit early.
|
||||
if self.retry_batch_download(network, id).is_err() {
|
||||
debug!(
|
||||
self.log,
|
||||
"Batch could not be retried";
|
||||
"batch_id" => id,
|
||||
"error" => "no synced peers"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(self.log, "Batch not found while removing peer";
|
||||
"peer" => %peer_id, "batch" => id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the peer from the participation list
|
||||
self.participating_peers.remove(peer_id);
|
||||
|
||||
@@ -1,3 +1,25 @@
|
||||
//! Implements block lookup sync.
|
||||
//!
|
||||
//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about.
|
||||
//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync
|
||||
//! is recursive in nature, as we may discover that this attested head block root has a parent that
|
||||
//! is also unknown to us.
|
||||
//!
|
||||
//! Block lookup is implemented as an event-driven state machine. It sends events to the network and
|
||||
//! beacon processor, and expects some set of events back. A discrepancy in the expected event API
|
||||
//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event
|
||||
//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups
|
||||
//! that live for too long, logging the line "Notify the devs a sync lookup is stuck".
|
||||
//!
|
||||
//! The expected event API is documented in the code paths that are making assumptions with the
|
||||
//! comment prefix "Lookup sync event safety:"
|
||||
//!
|
||||
//! Block lookup sync attempts to not re-download or re-process data that we already have. Block
|
||||
//! components are cached temporarily in multiple places before they are imported into fork-choice.
|
||||
//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download
|
||||
//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state
|
||||
//! returned to this module as `LookupRequestResult` variants.
|
||||
|
||||
use self::parent_chain::{compute_parent_chains, NodeChain};
|
||||
pub use self::single_block_lookup::DownloadResult;
|
||||
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
|
||||
@@ -410,21 +432,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
/* Error responses */
|
||||
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
|
||||
self.single_block_lookups.retain(|_, lookup| {
|
||||
for (_, lookup) in self.single_block_lookups.iter_mut() {
|
||||
lookup.remove_peer(peer_id);
|
||||
|
||||
// Note: this condition should be removed in the future. It's not strictly necessary to drop a
|
||||
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
|
||||
if lookup.has_no_peers() {
|
||||
debug!(self.log,
|
||||
"Dropping single lookup after peer disconnection";
|
||||
"block_root" => ?lookup.block_root()
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/* Processing responses */
|
||||
@@ -787,12 +797,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
};
|
||||
|
||||
if stuck_lookup.id == ancestor_stuck_lookup.id {
|
||||
warn!(self.log, "Notify the devs, a sync lookup is stuck";
|
||||
warn!(self.log, "Notify the devs a sync lookup is stuck";
|
||||
"block_root" => ?stuck_lookup.block_root(),
|
||||
"lookup" => ?stuck_lookup,
|
||||
);
|
||||
} else {
|
||||
warn!(self.log, "Notify the devs, a sync lookup is stuck";
|
||||
warn!(self.log, "Notify the devs a sync lookup is stuck";
|
||||
"block_root" => ?stuck_lookup.block_root(),
|
||||
"lookup" => ?stuck_lookup,
|
||||
"ancestor_block_root" => ?ancestor_stuck_lookup.block_root(),
|
||||
|
||||
@@ -197,21 +197,36 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
|
||||
let Some(peer_id) = self.use_rand_available_peer() else {
|
||||
// Allow lookup to not have any peers. In that case do nothing. If the lookup does
|
||||
// not have peers for some time, it will be dropped.
|
||||
// Allow lookup to not have any peers and do nothing. This is an optimization to not
|
||||
// lose progress of lookups created from a block with unknown parent before we receive
|
||||
// attestations for said block.
|
||||
// Lookup sync event safety: If a lookup requires peers to make progress, and does
|
||||
// not receive any new peers for some time it will be dropped. If it receives a new
|
||||
// peer it must attempt to make progress.
|
||||
R::request_state_mut(self)
|
||||
.get_state_mut()
|
||||
.update_awaiting_download_status("no peers");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let request = R::request_state_mut(self);
|
||||
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
|
||||
LookupRequestResult::RequestSent(req_id) => {
|
||||
// Lookup sync event safety: If make_request returns `RequestSent`, we are
|
||||
// guaranteed that `BlockLookups::on_download_response` will be called exactly
|
||||
// with this `req_id`.
|
||||
request.get_state_mut().on_download_start(req_id)?
|
||||
}
|
||||
LookupRequestResult::NoRequestNeeded => {
|
||||
// Lookup sync event safety: Advances this request to the terminal `Processed`
|
||||
// state. If all requests reach this state, the request is marked as completed
|
||||
// in `Self::continue_requests`.
|
||||
request.get_state_mut().on_completed_request()?
|
||||
}
|
||||
// Sync will receive a future event to make progress on the request, do nothing now
|
||||
LookupRequestResult::Pending(reason) => {
|
||||
// Lookup sync event safety: Refer to the code paths constructing
|
||||
// `LookupRequestResult::Pending`
|
||||
request
|
||||
.get_state_mut()
|
||||
.update_awaiting_download_status(reason);
|
||||
@@ -222,16 +237,28 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
// Otherwise, attempt to progress awaiting processing
|
||||
// If this request is awaiting a parent lookup to be processed, do not send for processing.
|
||||
// The request will be rejected with unknown parent error.
|
||||
//
|
||||
// TODO: The condition `block_is_processed || Block` can be dropped after checking for
|
||||
// unknown parent root when import RPC blobs
|
||||
} else if !awaiting_parent
|
||||
&& (block_is_processed || matches!(R::response_type(), ResponseType::Block))
|
||||
{
|
||||
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
|
||||
// useful to conditionally access the result data.
|
||||
if let Some(result) = request.get_state_mut().maybe_start_processing() {
|
||||
// Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed
|
||||
// that `BlockLookups::on_processing_result` will be called exactly once with this
|
||||
// lookup_id
|
||||
return R::send_for_processing(id, result, cx);
|
||||
}
|
||||
// Lookup sync event safety: If the request is not in `AwaitingDownload` or
|
||||
// `AwaitingProcessing` state it is guaranteed to receive some event to make progress.
|
||||
}
|
||||
|
||||
// Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either:
|
||||
// (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent
|
||||
// lookup completes, or (2) get dropped if the parent fails and is dropped.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -246,10 +273,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.peers.insert(peer_id)
|
||||
}
|
||||
|
||||
/// Remove peer from available peers. Return true if there are no more available peers and all
|
||||
/// requests are not expecting any future event (AwaitingDownload).
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
|
||||
self.peers.remove(peer_id)
|
||||
/// Remove peer from available peers.
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) {
|
||||
self.peers.remove(peer_id);
|
||||
}
|
||||
|
||||
/// Returns true if this lookup has zero peers
|
||||
|
||||
@@ -290,6 +290,7 @@ impl TestRig {
|
||||
.0
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_no_active_single_lookups(&self) {
|
||||
assert!(
|
||||
self.active_single_lookups().is_empty(),
|
||||
@@ -298,6 +299,7 @@ impl TestRig {
|
||||
);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_no_active_lookups(&self) {
|
||||
self.expect_no_active_single_lookups();
|
||||
}
|
||||
@@ -539,10 +541,6 @@ impl TestRig {
|
||||
})
|
||||
}
|
||||
|
||||
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
|
||||
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
|
||||
}
|
||||
|
||||
/// Return RPCErrors for all active requests of peer
|
||||
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
|
||||
self.drain_network_rx();
|
||||
@@ -562,6 +560,10 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_disconnected(&mut self, peer_id: PeerId) {
|
||||
self.send_sync_message(SyncMessage::Disconnect(peer_id));
|
||||
}
|
||||
|
||||
fn drain_network_rx(&mut self) {
|
||||
while let Ok(event) = self.network_rx.try_recv() {
|
||||
self.network_rx_queue.push(event);
|
||||
@@ -1026,6 +1028,28 @@ fn test_single_block_lookup_failure() {
|
||||
rig.expect_empty_network();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_peer_disconnected_then_rpc_error() {
|
||||
let mut rig = TestRig::test_setup();
|
||||
|
||||
let block_hash = Hash256::random();
|
||||
let peer_id = rig.new_connected_peer();
|
||||
|
||||
// Trigger the request.
|
||||
rig.trigger_unknown_block_from_attestation(block_hash, peer_id);
|
||||
let id = rig.expect_block_lookup_request(block_hash);
|
||||
|
||||
// The peer disconnect event reaches sync before the rpc error.
|
||||
rig.peer_disconnected(peer_id);
|
||||
// The lookup is not removed as it can still potentially make progress.
|
||||
rig.assert_single_lookups_count(1);
|
||||
// The request fails.
|
||||
rig.single_lookup_failed(id, peer_id, RPCError::Disconnected);
|
||||
rig.expect_block_lookup_request(block_hash);
|
||||
// The request should be removed from the network context on disconnection.
|
||||
rig.expect_empty_network();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_becomes_parent_request() {
|
||||
let mut rig = TestRig::test_setup();
|
||||
@@ -1289,19 +1313,9 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() {
|
||||
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
|
||||
rig.peer_disconnected(peer_id);
|
||||
rig.rpc_error_all_active_requests(peer_id);
|
||||
rig.expect_no_active_lookups();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lookup_peer_disconnected_no_peers_left_not_while_request() {
|
||||
let mut rig = TestRig::test_setup();
|
||||
let peer_id = rig.new_connected_peer();
|
||||
let trigger_block = rig.rand_block();
|
||||
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
|
||||
rig.peer_disconnected(peer_id);
|
||||
// Note: this test case may be removed in the future. It's not strictly necessary to drop a
|
||||
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
|
||||
rig.expect_no_active_lookups();
|
||||
// Erroring all rpc requests and disconnecting the peer shouldn't remove the requests
|
||||
// from the lookups map as they can still progress.
|
||||
rig.assert_single_lookups_count(2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use lighthouse_network::PeerId;
|
||||
use ssz_types::VariableList;
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
@@ -17,16 +18,19 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
|
||||
is_sidecars_stream_terminated: bool,
|
||||
/// Used to determine if this accumulator should wait for a sidecars stream termination
|
||||
request_type: ByRangeRequestType,
|
||||
/// The peer the request was made to.
|
||||
pub(crate) peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
|
||||
pub fn new(request_type: ByRangeRequestType) -> Self {
|
||||
pub fn new(request_type: ByRangeRequestType, peer_id: PeerId) -> Self {
|
||||
Self {
|
||||
accumulated_blocks: <_>::default(),
|
||||
accumulated_sidecars: <_>::default(),
|
||||
is_blocks_stream_terminated: <_>::default(),
|
||||
is_sidecars_stream_terminated: <_>::default(),
|
||||
request_type,
|
||||
peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,12 +113,14 @@ mod tests {
|
||||
use super::BlocksAndBlobsRequestInfo;
|
||||
use crate::sync::range_sync::ByRangeRequestType;
|
||||
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
|
||||
use lighthouse_network::PeerId;
|
||||
use rand::SeedableRng;
|
||||
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
|
||||
|
||||
#[test]
|
||||
fn no_blobs_into_responses() {
|
||||
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
|
||||
let peer_id = PeerId::random();
|
||||
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks, peer_id);
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
|
||||
@@ -133,7 +139,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn empty_blobs_into_responses() {
|
||||
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs);
|
||||
let peer_id = PeerId::random();
|
||||
let mut info =
|
||||
BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs, peer_id);
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| {
|
||||
|
||||
@@ -372,16 +372,39 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
Err(_) => self.update_sync_state(),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"RPC error for range request has no associated entry in network context, ungraceful disconnect";
|
||||
"peer_id" => %peer_id,
|
||||
"request_id" => %id,
|
||||
"error" => ?error,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a peer disconnect.
|
||||
///
|
||||
/// It is important that a peer disconnect retries all the batches/lookups as
|
||||
/// there is no way to guarantee that libp2p always emits a error along with
|
||||
/// the disconnect.
|
||||
fn peer_disconnect(&mut self, peer_id: &PeerId) {
|
||||
// Inject a Disconnected error on all requests associated with the disconnected peer
|
||||
// to retry all batches/lookups
|
||||
for request_id in self.network.peer_disconnected(peer_id) {
|
||||
self.inject_error(*peer_id, request_id, RPCError::Disconnected);
|
||||
}
|
||||
|
||||
// Remove peer from all data structures
|
||||
self.range_sync.peer_disconnect(&mut self.network, peer_id);
|
||||
let _ = self
|
||||
.backfill_sync
|
||||
.peer_disconnected(peer_id, &mut self.network);
|
||||
self.block_lookups.peer_disconnected(peer_id);
|
||||
|
||||
// Regardless of the outcome, we update the sync status.
|
||||
let _ = self.backfill_sync.peer_disconnected(peer_id);
|
||||
self.update_sync_state();
|
||||
}
|
||||
|
||||
@@ -951,7 +974,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.network.insert_range_blocks_and_blobs_request(
|
||||
id,
|
||||
resp.sender_id,
|
||||
BlocksAndBlobsRequestInfo::new(resp.request_type),
|
||||
BlocksAndBlobsRequestInfo::new(resp.request_type, peer_id),
|
||||
);
|
||||
// inform range that the request needs to be treated as failed
|
||||
// With time we will want to downgrade this log
|
||||
|
||||
@@ -177,6 +177,46 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the ids of all the requests made to the given peer_id.
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec<SyncRequestId> {
|
||||
let failed_range_ids =
|
||||
self.range_blocks_and_blobs_requests
|
||||
.iter()
|
||||
.filter_map(|(id, request)| {
|
||||
if request.1.peer_id == *peer_id {
|
||||
Some(SyncRequestId::RangeBlockAndBlobs { id: *id })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
let failed_block_ids = self
|
||||
.blocks_by_root_requests
|
||||
.iter()
|
||||
.filter_map(|(id, request)| {
|
||||
if request.peer_id == *peer_id {
|
||||
Some(SyncRequestId::SingleBlock { id: *id })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
let failed_blob_ids = self
|
||||
.blobs_by_root_requests
|
||||
.iter()
|
||||
.filter_map(|(id, request)| {
|
||||
if request.peer_id == *peer_id {
|
||||
Some(SyncRequestId::SingleBlob { id: *id })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
failed_range_ids
|
||||
.chain(failed_block_ids)
|
||||
.chain(failed_blob_ids)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
|
||||
&self.network_beacon_processor.network_globals
|
||||
}
|
||||
@@ -272,8 +312,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
sender_id: RangeRequestId,
|
||||
) -> Result<Id, RpcRequestSendError> {
|
||||
let id = self.blocks_by_range_request(peer_id, batch_type, request)?;
|
||||
self.range_blocks_and_blobs_requests
|
||||
.insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type)));
|
||||
self.range_blocks_and_blobs_requests.insert(
|
||||
id,
|
||||
(
|
||||
sender_id,
|
||||
BlocksAndBlobsRequestInfo::new(batch_type, peer_id),
|
||||
),
|
||||
);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
@@ -343,7 +388,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
// Block is known are currently processing, expect a future event with the result of
|
||||
// processing.
|
||||
BlockProcessStatus::NotValidated { .. } => {
|
||||
return Ok(LookupRequestResult::Pending("block in processing cache"))
|
||||
// Lookup sync event safety: If the block is currently in the processing cache, we
|
||||
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
|
||||
// make progress on this lookup
|
||||
return Ok(LookupRequestResult::Pending("block in processing cache"));
|
||||
}
|
||||
// Block is fully validated. If it's not yet imported it's waiting for missing block
|
||||
// components. Consider this request completed and do nothing.
|
||||
@@ -366,6 +414,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
|
||||
let request = BlocksByRootSingleRequest(block_root);
|
||||
|
||||
// Lookup sync event safety: If network_send.send() returns Ok(_) we are guaranteed that
|
||||
// eventually at least one this 3 events will be received:
|
||||
// - StreamTermination(request_id): handled by `Self::on_single_block_response`
|
||||
// - RPCError(request_id): handled by `Self::on_single_block_response`
|
||||
// - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a
|
||||
// ` RPCError(request_id)`event handled by the above method
|
||||
self.network_send
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
@@ -375,7 +429,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
self.blocks_by_root_requests
|
||||
.insert(id, ActiveBlocksByRootRequest::new(request));
|
||||
.insert(id, ActiveBlocksByRootRequest::new(request, peer_id));
|
||||
|
||||
Ok(LookupRequestResult::RequestSent(req_id))
|
||||
}
|
||||
@@ -408,6 +462,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
// latter handle the case where if the peer sent no blobs, penalize.
|
||||
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
|
||||
// - if `num_expected_blobs` returns Some = block is processed.
|
||||
//
|
||||
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
|
||||
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
|
||||
// be downloaded yet or (2) the block is already imported into the fork-choice.
|
||||
// In case (1) the lookup must either successfully download the block or get dropped.
|
||||
// In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and
|
||||
// get dropped as completed.
|
||||
return Ok(LookupRequestResult::Pending("waiting for block download"));
|
||||
};
|
||||
|
||||
@@ -444,6 +505,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
indices,
|
||||
};
|
||||
|
||||
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
|
||||
self.network_send
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
@@ -453,7 +515,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
self.blobs_by_root_requests
|
||||
.insert(id, ActiveBlobsByRootRequest::new(request));
|
||||
.insert(id, ActiveBlobsByRootRequest::new(request, peer_id));
|
||||
|
||||
Ok(LookupRequestResult::RequestSent(req_id))
|
||||
}
|
||||
@@ -660,6 +722,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
|
||||
|
||||
debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id);
|
||||
// Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync
|
||||
// must receive a single `SyncMessage::BlockComponentProcessed` with this process type
|
||||
beacon_processor
|
||||
.send_rpc_beacon_block(
|
||||
block_root,
|
||||
@@ -689,6 +753,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
|
||||
|
||||
debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id);
|
||||
// Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync
|
||||
// must receive a single `SyncMessage::BlockComponentProcessed` event with this process type
|
||||
beacon_processor
|
||||
.send_rpc_blobs(
|
||||
block_root,
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use beacon_chain::get_block_root;
|
||||
use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest};
|
||||
use lighthouse_network::{
|
||||
rpc::{methods::BlobsByRootRequest, BlocksByRootRequest},
|
||||
PeerId,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use strum::IntoStaticStr;
|
||||
use types::{
|
||||
@@ -20,13 +23,15 @@ pub enum LookupVerifyError {
|
||||
pub struct ActiveBlocksByRootRequest {
|
||||
request: BlocksByRootSingleRequest,
|
||||
resolved: bool,
|
||||
pub(crate) peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl ActiveBlocksByRootRequest {
|
||||
pub fn new(request: BlocksByRootSingleRequest) -> Self {
|
||||
pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self {
|
||||
Self {
|
||||
request,
|
||||
resolved: false,
|
||||
peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,14 +99,16 @@ pub struct ActiveBlobsByRootRequest<E: EthSpec> {
|
||||
request: BlobsByRootSingleBlockRequest,
|
||||
blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||
resolved: bool,
|
||||
pub(crate) peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
|
||||
pub fn new(request: BlobsByRootSingleBlockRequest) -> Self {
|
||||
pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self {
|
||||
Self {
|
||||
request,
|
||||
blobs: vec![],
|
||||
resolved: false,
|
||||
peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -174,8 +174,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
/// Removes a peer from the chain.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
|
||||
self.peers.remove(peer_id);
|
||||
pub fn remove_peer(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
network: &mut SyncNetworkContext<T>,
|
||||
) -> ProcessingResult {
|
||||
if let Some(batch_ids) = self.peers.remove(peer_id) {
|
||||
// fail the batches.
|
||||
for id in batch_ids {
|
||||
if let Some(batch) = self.batches.get_mut(&id) {
|
||||
if let BatchOperationOutcome::Failed { blacklist } =
|
||||
batch.download_failed(true)?
|
||||
{
|
||||
return Err(RemoveChain::ChainFailed {
|
||||
blacklist,
|
||||
failing_batch: id,
|
||||
});
|
||||
}
|
||||
self.retry_batch_download(network, id)?;
|
||||
} else {
|
||||
debug!(self.log, "Batch not found while removing peer";
|
||||
"peer" => %peer_id, "batch" => id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.peers.is_empty() {
|
||||
Err(RemoveChain::EmptyPeerPool)
|
||||
|
||||
@@ -278,8 +278,9 @@ where
|
||||
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
|
||||
/// retries. In this case, we need to remove the chain.
|
||||
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
|
||||
for (removed_chain, sync_type, remove_reason) in
|
||||
self.chains.call_all(|chain| chain.remove_peer(peer_id))
|
||||
for (removed_chain, sync_type, remove_reason) in self
|
||||
.chains
|
||||
.call_all(|chain| chain.remove_peer(peer_id, network))
|
||||
{
|
||||
self.on_chain_removed(
|
||||
removed_chain,
|
||||
|
||||
Reference in New Issue
Block a user