Merge branch 'sync-bug-finder' into proto-array

This commit is contained in:
Paul Hauner
2020-01-17 16:12:42 +11:00
2 changed files with 19 additions and 8 deletions

View File

@@ -12,7 +12,7 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use slog::{crit, debug, error};
use slog::{crit, debug, error, warn};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
@@ -314,8 +314,12 @@ where
substream: out,
request,
};
self.outbound_substreams
.insert(id, (awaiting_stream, delay_key));
if let Some(_) = self
.outbound_substreams
.insert(id, (awaiting_stream, delay_key))
{
warn!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
}
}
_ => { // a response is not expected, drop the stream for all other requests
}

View File

@@ -18,7 +18,7 @@ use types::{BeaconBlock, EthSpec, Hash256, Slot};
/// to do so.
//TODO: Make this dynamic based on peer's bandwidth
//TODO: This is lower due to current thread design. Modify once rebuilt.
const BLOCKS_PER_BATCH: u64 = 25;
const BLOCKS_PER_BATCH: u64 = 50;
/// The number of times to retry a batch before the chain is considered failed and removed.
const MAX_BATCH_RETRIES: u8 = 5;
@@ -149,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch.
let batch = self.pending_batches.remove(&request_id)?;
trace!(log, "Batch downloaded"; "id" => batch.id, "request_id" => request_id);
Some(self.process_completed_batch(chain.clone(), network, batch, log))
}
}
@@ -429,7 +430,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(batch) = self.get_next_batch(peer_id) {
debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
// send the batch
self.send_batch(network, batch);
self.send_batch(network, batch, log);
return true;
}
}
@@ -449,11 +450,17 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
/// Requests the provided batch from the provided peer.
fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
fn send_batch(
&mut self,
network: &mut SyncNetworkContext,
batch: Batch<T::EthSpec>,
log: &slog::Logger,
) {
let request = batch.to_blocks_by_range_request();
if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request)
{
// add the batch to pending list
trace!(log, "Batch requested"; "id" => batch.id, "request_id" => request_id);
self.pending_batches.insert(request_id, batch);
}
}
@@ -504,7 +511,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
log: &slog::Logger,
) -> Option<ProcessingResult> {
if let Some(batch) = self.pending_batches.remove(&request_id) {
warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id));
warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id), "request_id" => request_id);
Some(self.failed_batch(network, batch, log))
} else {
@@ -542,7 +549,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.current_peer = new_peer.clone();
debug!(log, "Re-Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
self.send_batch(network, batch);
self.send_batch(network, batch, log);
ProcessingResult::KeepChain
}
}