mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 21:34:46 +00:00
Merge branch 'master' into proto-array
This commit is contained in:
@@ -952,13 +952,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// Only log a warning if our head is in a reasonable place to verify this attestation.
|
||||
// This avoids excess logging during syncing.
|
||||
if head_epoch + 1 >= attestation_epoch {
|
||||
debug!(
|
||||
trace!(
|
||||
self.log,
|
||||
"Dropped attestation for unknown block";
|
||||
"block" => format!("{}", attestation.data.beacon_block_root)
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
trace!(
|
||||
self.log,
|
||||
"Dropped attestation for unknown block";
|
||||
"block" => format!("{}", attestation.data.beacon_block_root)
|
||||
|
||||
@@ -12,7 +12,7 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
|
||||
use libp2p::swarm::protocols_handler::{
|
||||
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||
};
|
||||
use slog::{crit, debug, error, trace};
|
||||
use slog::{crit, debug, error};
|
||||
use smallvec::SmallVec;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -57,7 +57,7 @@ where
|
||||
|
||||
/// Current inbound substreams awaiting processing.
|
||||
inbound_substreams:
|
||||
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSubstream>, delay_queue::Key)>,
|
||||
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSubstream>, Option<delay_queue::Key>)>,
|
||||
|
||||
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
|
||||
inbound_substreams_delay: DelayQueue<InboundRequestId>,
|
||||
@@ -134,6 +134,52 @@ pub enum OutboundSubstreamState<TSubstream> {
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
impl<TSubstream> InboundSubstreamState<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Moves the substream state to closing and informs the connected peer. The
|
||||
/// `queued_outbound_items` must be given as a parameter to add stream termination messages to
|
||||
/// the outbound queue.
|
||||
pub fn close(&mut self, outbound_queue: &mut Vec<RPCErrorResponse>) {
|
||||
// When terminating a stream, report the stream termination to the requesting user via
|
||||
// an RPC error
|
||||
let error = RPCErrorResponse::ServerError(ErrorMessage {
|
||||
error_message: "Request timed out".as_bytes().to_vec(),
|
||||
});
|
||||
|
||||
// The stream termination type is irrelevant, this will terminate the
|
||||
// stream
|
||||
let stream_termination =
|
||||
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
|
||||
|
||||
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
|
||||
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
|
||||
if !closing {
|
||||
outbound_queue.push(error);
|
||||
outbound_queue.push(stream_termination);
|
||||
}
|
||||
// if the stream is closing after the send, allow it to finish
|
||||
|
||||
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
|
||||
}
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
*self = InboundSubstreamState::ResponsePendingSend {
|
||||
substream: substream.send(error),
|
||||
closing: true,
|
||||
};
|
||||
}
|
||||
InboundSubstreamState::Closing(substream) => {
|
||||
// let the stream close
|
||||
*self = InboundSubstreamState::Closing(substream);
|
||||
}
|
||||
InboundSubstreamState::Poisoned => {
|
||||
unreachable!("Coding error: Timeout poisoned substream")
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> RPCHandler<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
@@ -233,7 +279,7 @@ where
|
||||
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
|
||||
self.inbound_substreams.insert(
|
||||
self.current_inbound_substream_id,
|
||||
(awaiting_stream, delay_key),
|
||||
(awaiting_stream, Some(delay_key)),
|
||||
);
|
||||
|
||||
self.events_out
|
||||
@@ -294,7 +340,7 @@ where
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
// close the stream if there is no response
|
||||
if let RPCErrorResponse::StreamTermination(_) = response {
|
||||
trace!(self.log, "Stream termination sent. Ending the stream");
|
||||
//trace!(self.log, "Stream termination sent. Ending the stream");
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
} else {
|
||||
// send the response
|
||||
@@ -442,47 +488,15 @@ where
|
||||
let rpc_id = stream_id.get_ref();
|
||||
|
||||
// handle a stream timeout for various states
|
||||
if let Some((substream_state, _)) = self.inbound_substreams.get_mut(rpc_id) {
|
||||
// When terminating a stream, report the stream termination to the requesting user via
|
||||
// an RPC error
|
||||
let error = RPCErrorResponse::ServerError(ErrorMessage {
|
||||
error_message: "Request timed out".as_bytes().to_vec(),
|
||||
});
|
||||
// The stream termination type is irrelevant, this will terminate the
|
||||
// stream
|
||||
let stream_termination =
|
||||
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
|
||||
if let Some((substream_state, delay_key)) = self.inbound_substreams.get_mut(rpc_id) {
|
||||
// the delay has been removed
|
||||
*delay_key = None;
|
||||
|
||||
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
|
||||
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
|
||||
if !closing {
|
||||
// if the stream is not closing, add an error to the stream queue and exit
|
||||
let queue = self
|
||||
.queued_outbound_items
|
||||
.entry(*rpc_id)
|
||||
.or_insert_with(Vec::new);
|
||||
queue.push(error);
|
||||
queue.push(stream_termination);
|
||||
}
|
||||
// if the stream is closing after the send, allow it to finish
|
||||
|
||||
*substream_state =
|
||||
InboundSubstreamState::ResponsePendingSend { substream, closing }
|
||||
}
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream: substream.send(error),
|
||||
closing: true,
|
||||
};
|
||||
}
|
||||
InboundSubstreamState::Closing(substream) => {
|
||||
// let the stream close
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
}
|
||||
InboundSubstreamState::Poisoned => {
|
||||
unreachable!("Coding error: Timeout poisoned substream")
|
||||
}
|
||||
};
|
||||
let outbound_queue = self
|
||||
.queued_outbound_items
|
||||
.entry(*rpc_id)
|
||||
.or_insert_with(Vec::new);
|
||||
substream_state.close(outbound_queue);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -546,8 +560,9 @@ where
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
let delay_key = &entry.get().1;
|
||||
self.inbound_substreams_delay.remove(delay_key);
|
||||
if let Some(delay_key) = &entry.get().1 {
|
||||
self.inbound_substreams_delay.remove(delay_key);
|
||||
}
|
||||
entry.remove_entry();
|
||||
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||
RPCEvent::Error(0, e),
|
||||
@@ -565,10 +580,11 @@ where
|
||||
InboundSubstreamState::Closing(mut substream) => {
|
||||
match substream.close() {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
trace!(self.log, "Inbound stream dropped");
|
||||
let delay_key = &entry.get().1;
|
||||
//trace!(self.log, "Inbound stream dropped");
|
||||
if let Some(delay_key) = &entry.get().1 {
|
||||
self.inbound_substreams_delay.remove(delay_key);
|
||||
}
|
||||
self.queued_outbound_items.remove(&request_id);
|
||||
self.inbound_substreams_delay.remove(delay_key);
|
||||
entry.remove();
|
||||
} // drop the stream
|
||||
Ok(Async::NotReady) => {
|
||||
@@ -613,7 +629,7 @@ where
|
||||
} else {
|
||||
// either this is a single response request or we received an
|
||||
// error
|
||||
trace!(self.log, "Closing single stream request");
|
||||
//trace!(self.log, "Closing single stream request");
|
||||
// only expect a single response, close the stream
|
||||
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
|
||||
}
|
||||
@@ -626,7 +642,7 @@ where
|
||||
// stream closed
|
||||
// if we expected multiple streams send a stream termination,
|
||||
// else report the stream terminating only.
|
||||
trace!(self.log, "RPC Response - stream closed by remote");
|
||||
//trace!(self.log, "RPC Response - stream closed by remote");
|
||||
// drop the stream
|
||||
let delay_key = &entry.get().1;
|
||||
self.outbound_substreams_delay.remove(delay_key);
|
||||
@@ -670,7 +686,7 @@ where
|
||||
},
|
||||
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
trace!(self.log, "Outbound stream dropped");
|
||||
//trace!(self.log, "Outbound stream dropped");
|
||||
// drop the stream
|
||||
let delay_key = &entry.get().1;
|
||||
self.outbound_substreams_delay.remove(delay_key);
|
||||
|
||||
@@ -120,6 +120,16 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
/// Sends a `Status` message to the peer.
|
||||
pub fn on_connect(&mut self, peer_id: PeerId) {
|
||||
if let Some(status_message) = status_message(&self.chain) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Sending Status Request";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"fork_version" => format!("{:?}", status_message.fork_version),
|
||||
"finalized_root" => format!("{:?}", status_message.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
|
||||
"head_root" => format!("{}", status_message.head_root),
|
||||
"head_slot" => format!("{}", status_message.head_slot),
|
||||
);
|
||||
self.network
|
||||
.send_rpc_request(peer_id, RPCRequest::Status(status_message));
|
||||
}
|
||||
@@ -134,9 +144,18 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
request_id: RequestId,
|
||||
status: StatusMessage,
|
||||
) {
|
||||
// ignore status responses if we are shutting down
|
||||
trace!(self.log, "StatusRequest"; "peer" => format!("{:?}", peer_id));
|
||||
debug!(
|
||||
self.log,
|
||||
"Received Status Request";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"fork_version" => format!("{:?}", status.fork_version),
|
||||
"finalized_root" => format!("{:?}", status.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status.finalized_epoch),
|
||||
"head_root" => format!("{}", status.head_root),
|
||||
"head_slot" => format!("{}", status.head_slot),
|
||||
);
|
||||
|
||||
// ignore status responses if we are shutting down
|
||||
if let Some(status_message) = status_message(&self.chain) {
|
||||
// Say status back.
|
||||
self.network.send_rpc_response(
|
||||
@@ -550,7 +569,7 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
|
||||
}
|
||||
AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => {
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
trace!(
|
||||
self.log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => format!("{:?}", peer_id),
|
||||
|
||||
@@ -237,7 +237,7 @@ fn network_service(
|
||||
match libp2p_service.lock().poll() {
|
||||
Ok(Async::Ready(Some(event))) => match event {
|
||||
Libp2pEvent::RPC(peer_id, rpc_event) => {
|
||||
trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event));
|
||||
// trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event));
|
||||
|
||||
// if we received a Goodbye message, drop and ban the peer
|
||||
if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event {
|
||||
|
||||
@@ -36,14 +36,19 @@ impl SyncNetworkContext {
|
||||
chain: Weak<BeaconChain<T>>,
|
||||
peer_id: PeerId,
|
||||
) {
|
||||
trace!(
|
||||
self.log,
|
||||
"Sending Status Request";
|
||||
"method" => "STATUS",
|
||||
"peer" => format!("{:?}", peer_id)
|
||||
);
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
if let Some(status_message) = status_message(&chain) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Sending Status Request";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"fork_version" => format!("{:?}", status_message.fork_version),
|
||||
"finalized_root" => format!("{:?}", status_message.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
|
||||
"head_root" => format!("{}", status_message.head_root),
|
||||
"head_slot" => format!("{}", status_message.head_slot),
|
||||
);
|
||||
|
||||
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message));
|
||||
}
|
||||
}
|
||||
@@ -124,8 +129,7 @@ impl SyncNetworkContext {
|
||||
self.network_send
|
||||
.try_send(NetworkMessage::RPC(peer_id, rpc_event))
|
||||
.map_err(|_| {
|
||||
// This is likely to happen when shutting down. Suppress this warning to trace for now
|
||||
trace!(
|
||||
debug!(
|
||||
self.log,
|
||||
"Could not send RPC message to the network service"
|
||||
);
|
||||
|
||||
107
beacon_node/network/src/sync/range_sync/batch.rs
Normal file
107
beacon_node/network/src/sync/range_sync/batch.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
use eth2_libp2p::rpc::RequestId;
|
||||
use eth2_libp2p::PeerId;
|
||||
use fnv::FnvHashMap;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use types::{BeaconBlock, EthSpec, Hash256, Slot};
|
||||
|
||||
/// A collection of sequential blocks that are requested from peers in a single RPC request.
|
||||
#[derive(PartialEq)]
|
||||
pub struct Batch<T: EthSpec> {
|
||||
/// The ID of the batch, these are sequential.
|
||||
pub id: u64,
|
||||
/// The requested start slot of the batch, inclusive.
|
||||
pub start_slot: Slot,
|
||||
/// The requested end slot of batch, exclusive.
|
||||
pub end_slot: Slot,
|
||||
/// The hash of the chain root to requested from the peer.
|
||||
pub head_root: Hash256,
|
||||
/// The peer that was originally assigned to the batch.
|
||||
pub _original_peer: PeerId,
|
||||
/// The peer that is currently assigned to the batch.
|
||||
pub current_peer: PeerId,
|
||||
/// The number of retries this batch has undergone.
|
||||
pub retries: u8,
|
||||
/// The blocks that have been downloaded.
|
||||
pub downloaded_blocks: Vec<BeaconBlock<T>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> Ord for Batch<T> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.id.cmp(&other.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> PartialOrd for Batch<T> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// A structure that contains a mapping of pending batch requests, that also keeps track of which
|
||||
/// peers are currently making batch requests.
|
||||
///
|
||||
/// This is used to optimise searches for idle peers (peers that have no outbound batch requests).
|
||||
pub struct PendingBatches<T: EthSpec> {
|
||||
/// The current pending batches.
|
||||
batches: FnvHashMap<RequestId, Batch<T>>,
|
||||
/// A mapping of peers to the number of pending requests.
|
||||
peer_requests: HashMap<PeerId, HashSet<RequestId>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> PendingBatches<T> {
|
||||
pub fn new() -> Self {
|
||||
PendingBatches {
|
||||
batches: FnvHashMap::default(),
|
||||
peer_requests: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, request_id: RequestId, batch: Batch<T>) -> Option<Batch<T>> {
|
||||
let peer_request = batch.current_peer.clone();
|
||||
self.peer_requests
|
||||
.entry(peer_request)
|
||||
.or_insert_with(|| HashSet::new())
|
||||
.insert(request_id);
|
||||
self.batches.insert(request_id, batch)
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, request_id: &RequestId) -> Option<Batch<T>> {
|
||||
if let Some(batch) = self.batches.remove(request_id) {
|
||||
if let Entry::Occupied(mut entry) = self.peer_requests.entry(batch.current_peer.clone())
|
||||
{
|
||||
entry.get_mut().remove(request_id);
|
||||
|
||||
if entry.get().is_empty() {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
Some(batch)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a block to the batches if the request id exists. Returns None if there is no batch
|
||||
/// matching the request id.
|
||||
pub fn add_block(&mut self, request_id: &RequestId, block: BeaconBlock<T>) -> Option<()> {
|
||||
let batch = self.batches.get_mut(request_id)?;
|
||||
batch.downloaded_blocks.push(block);
|
||||
Some(())
|
||||
}
|
||||
|
||||
/// Returns true if there the peer does not exist in the peer_requests mapping. Indicating it
|
||||
/// has no pending outgoing requests.
|
||||
pub fn peer_is_idle(&self, peer_id: &PeerId) -> bool {
|
||||
self.peer_requests.get(peer_id).is_none()
|
||||
}
|
||||
|
||||
/// Removes a batch for a given peer.
|
||||
pub fn remove_batch_by_peer(&mut self, peer_id: &PeerId) -> Option<Batch<T>> {
|
||||
let request_ids = self.peer_requests.get(peer_id)?;
|
||||
|
||||
let request_id = request_ids.iter().next()?.clone();
|
||||
self.remove(&request_id)
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,11 @@
|
||||
use crate::message_processor::FUTURE_SLOT_TOLERANCE;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::range_sync::batch::{Batch, PendingBatches};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
|
||||
use eth2_libp2p::rpc::methods::*;
|
||||
use eth2_libp2p::rpc::RequestId;
|
||||
use eth2_libp2p::PeerId;
|
||||
use fnv::FnvHashMap;
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Sub;
|
||||
use std::sync::Weak;
|
||||
@@ -18,44 +17,12 @@ use types::{BeaconBlock, EthSpec, Hash256, Slot};
|
||||
/// responder will fill the response up to the max request size, assuming they have the bandwidth
|
||||
/// to do so.
|
||||
//TODO: Make this dynamic based on peer's bandwidth
|
||||
const BLOCKS_PER_BATCH: u64 = 50;
|
||||
//TODO: This is lower due to current thread design. Modify once rebuilt.
|
||||
const BLOCKS_PER_BATCH: u64 = 25;
|
||||
|
||||
/// The number of times to retry a batch before the chain is considered failed and removed.
|
||||
const MAX_BATCH_RETRIES: u8 = 5;
|
||||
|
||||
/// A collection of sequential blocks that are requested from peers in a single RPC request.
|
||||
#[derive(PartialEq)]
|
||||
pub struct Batch<T: EthSpec> {
|
||||
/// The ID of the batch, batches are ID's sequentially.
|
||||
id: u64,
|
||||
/// The requested start slot of the batch, inclusive.
|
||||
start_slot: Slot,
|
||||
/// The requested end slot of batch, exclusive.
|
||||
end_slot: Slot,
|
||||
/// The hash of the chain root to requested from the peer.
|
||||
head_root: Hash256,
|
||||
/// The peer that was originally assigned to the batch.
|
||||
_original_peer: PeerId,
|
||||
/// The peer that is currently assigned to the batch.
|
||||
pub current_peer: PeerId,
|
||||
/// The number of retries this batch has undergone.
|
||||
retries: u8,
|
||||
/// The blocks that have been downloaded.
|
||||
downloaded_blocks: Vec<BeaconBlock<T>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> Ord for Batch<T> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.id.cmp(&other.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> PartialOrd for Batch<T> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
|
||||
/// has been completed and should be removed or to be kept if further processing is
|
||||
/// required.
|
||||
@@ -105,7 +72,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
||||
|
||||
/// The batches that are currently awaiting a response from a peer. An RPC request for these
|
||||
/// have been sent.
|
||||
pub pending_batches: FnvHashMap<RequestId, Batch<T::EthSpec>>,
|
||||
pub pending_batches: PendingBatches<T::EthSpec>,
|
||||
|
||||
/// The batches that have been downloaded and are awaiting processing and/or validation.
|
||||
completed_batches: Vec<Batch<T::EthSpec>>,
|
||||
@@ -151,7 +118,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
start_slot,
|
||||
target_head_slot,
|
||||
target_head_root,
|
||||
pending_batches: FnvHashMap::default(),
|
||||
pending_batches: PendingBatches::new(),
|
||||
completed_batches: Vec::new(),
|
||||
peer_pool,
|
||||
to_be_downloaded_id: 1,
|
||||
@@ -176,9 +143,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
log: &slog::Logger,
|
||||
) -> Option<ProcessingResult> {
|
||||
if let Some(block) = beacon_block {
|
||||
let batch = self.pending_batches.get_mut(&request_id)?;
|
||||
// This is not a stream termination, simply add the block to the request
|
||||
batch.downloaded_blocks.push(block.clone());
|
||||
self.pending_batches.add_block(&request_id, block.clone())?;
|
||||
return Some(ProcessingResult::KeepChain);
|
||||
} else {
|
||||
// A stream termination has been sent. This batch has ended. Process a completed batch.
|
||||
@@ -202,10 +168,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
// blocks for the peer.
|
||||
debug!(log, "Completed batch received"; "id"=>batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
|
||||
|
||||
// The peer that completed this batch, may be re-requested if this batch doesn't complete
|
||||
// the chain and there is no error in processing
|
||||
let current_peer = batch.current_peer.clone();
|
||||
|
||||
// verify the range of received blocks
|
||||
// Note that the order of blocks is verified in block processing
|
||||
if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot) {
|
||||
@@ -244,7 +206,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
if self.state == ChainSyncingState::Syncing {
|
||||
// pre-emptively request more blocks from peers whilst we process current blocks,
|
||||
self.send_range_request(network, current_peer, log);
|
||||
if !self.send_range_request(network, log) {
|
||||
debug!(log, "No peer available for next batch.")
|
||||
}
|
||||
}
|
||||
|
||||
// Try and process batches sequentially in the ordered list.
|
||||
@@ -424,38 +388,22 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
.retain(|batch| batch.id >= last_processed_id.saturating_sub(1));
|
||||
}
|
||||
|
||||
// Now begin requesting blocks from the peer pool. Ignore any peers with currently
|
||||
// pending requests
|
||||
let pending_peers = self
|
||||
.pending_batches
|
||||
.values()
|
||||
.map(|batch| batch.current_peer.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let peers = self
|
||||
.peer_pool
|
||||
.iter()
|
||||
.filter(|peer| !pending_peers.contains(peer))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for peer_id in peers {
|
||||
// send a blocks by range request to the peer
|
||||
self.send_range_request(network, peer_id, log);
|
||||
}
|
||||
// Now begin requesting blocks from the peer pool, until all peers are exhausted.
|
||||
while self.send_range_request(network, log) {}
|
||||
|
||||
self.state = ChainSyncingState::Syncing;
|
||||
}
|
||||
|
||||
/// A peer has been added.
|
||||
/// Add a peer to the chain.
|
||||
///
|
||||
/// If the chain is active, this starts requesting batches from this peer.
|
||||
pub fn peer_added(
|
||||
pub fn add_peer(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
peer_id: PeerId,
|
||||
log: &slog::Logger,
|
||||
) {
|
||||
self.peer_pool.insert(peer_id.clone());
|
||||
// do not request blocks if the chain is not syncing
|
||||
if let ChainSyncingState::Stopped = self.state {
|
||||
debug!(log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id));
|
||||
@@ -463,7 +411,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
|
||||
// find the next batch and request it from the peer
|
||||
self.send_range_request(network, peer_id, log);
|
||||
self.send_range_request(network, log);
|
||||
}
|
||||
|
||||
/// Sends a STATUS message to all peers in the peer pool.
|
||||
@@ -473,19 +421,31 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the next required batch from the provided peer.
|
||||
fn send_range_request(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
peer_id: PeerId,
|
||||
log: &slog::Logger,
|
||||
) {
|
||||
/// Requests the next required batch from a peer. Returns true, if there was a peer available
|
||||
/// to send a request and there are batches to request, false otherwise.
|
||||
fn send_range_request(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) -> bool {
|
||||
// find the next pending batch and request it from the peer
|
||||
if let Some(batch) = self.get_next_batch(peer_id) {
|
||||
debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
|
||||
// send the batch
|
||||
self.send_batch(network, batch);
|
||||
if let Some(peer_id) = self.get_next_peer() {
|
||||
if let Some(batch) = self.get_next_batch(peer_id) {
|
||||
debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
|
||||
// send the batch
|
||||
self.send_batch(network, batch);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns a peer if there exists a peer which does not currently have a pending request.
|
||||
///
|
||||
/// This is used to create the next request.
|
||||
fn get_next_peer(&self) -> Option<PeerId> {
|
||||
for peer in self.peer_pool.iter() {
|
||||
if self.pending_batches.peer_is_idle(peer) {
|
||||
return Some(peer.clone());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Requests the provided batch from the provided peer.
|
||||
@@ -546,7 +506,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
if let Some(batch) = self.pending_batches.remove(&request_id) {
|
||||
warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id));
|
||||
|
||||
Some(self.failed_batch(network, batch))
|
||||
Some(self.failed_batch(network, batch, log))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -561,6 +521,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
mut batch: Batch<T::EthSpec>,
|
||||
log: &Logger,
|
||||
) -> ProcessingResult {
|
||||
batch.retries += 1;
|
||||
|
||||
@@ -580,6 +541,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
.unwrap_or_else(|| current_peer);
|
||||
|
||||
batch.current_peer = new_peer.clone();
|
||||
debug!(log, "Re-Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
|
||||
self.send_batch(network, batch);
|
||||
ProcessingResult::KeepChain
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! This provides the logic for syncing a chain when the local node is far behind it's current
|
||||
//! peers.
|
||||
|
||||
mod batch;
|
||||
mod chain;
|
||||
mod chain_collection;
|
||||
mod range;
|
||||
|
||||
@@ -154,8 +154,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot);
|
||||
|
||||
// add the peer to the chain's peer pool
|
||||
chain.peer_pool.insert(peer_id.clone());
|
||||
chain.peer_added(network, peer_id, &self.log);
|
||||
chain.add_peer(network, peer_id, &self.log);
|
||||
|
||||
// check if the new peer's addition will favour a new syncing chain.
|
||||
self.chains.update_finalized(network, &self.log);
|
||||
@@ -189,8 +188,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id));
|
||||
|
||||
// add the peer to the head's pool
|
||||
chain.peer_pool.insert(peer_id.clone());
|
||||
chain.peer_added(network, peer_id.clone(), &self.log);
|
||||
chain.add_peer(network, peer_id.clone(), &self.log);
|
||||
} else {
|
||||
// There are no other head chains that match this peer's status, create a new one, and
|
||||
let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot);
|
||||
@@ -306,22 +304,16 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
|
||||
/// retries. In this case, we need to remove the chain and re-status all the peers.
|
||||
fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) {
|
||||
let log_ref = &self.log;
|
||||
match self.chains.head_finalized_request(|chain| {
|
||||
if chain.peer_pool.remove(&peer_id) {
|
||||
if chain.peer_pool.remove(peer_id) {
|
||||
// this chain contained the peer
|
||||
let pending_batches_requests = chain
|
||||
.pending_batches
|
||||
.iter()
|
||||
.filter(|(_, batch)| batch.current_peer == *peer_id)
|
||||
.map(|(id, _)| id)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
for request_id in pending_batches_requests {
|
||||
if let Some(batch) = chain.pending_batches.remove(&request_id) {
|
||||
if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) {
|
||||
// a single batch failed, remove the chain
|
||||
return Some(ProcessingResult::RemoveChain);
|
||||
}
|
||||
while let Some(batch) = chain.pending_batches.remove_batch_by_peer(peer_id) {
|
||||
if let ProcessingResult::RemoveChain =
|
||||
chain.failed_batch(network, batch, log_ref)
|
||||
{
|
||||
// a single batch failed, remove the chain
|
||||
return Some(ProcessingResult::RemoveChain);
|
||||
}
|
||||
}
|
||||
// peer removed from chain, no batch failed
|
||||
|
||||
Reference in New Issue
Block a user