diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index e7b225e16c..31d8e7e78f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -12,7 +12,7 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use slog::{crit, debug, error}; +use slog::{crit, debug, error, warn}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; @@ -314,8 +314,12 @@ where substream: out, request, }; - self.outbound_substreams - .insert(id, (awaiting_stream, delay_key)); + if let Some(_) = self + .outbound_substreams + .insert(id, (awaiting_stream, delay_key)) + { + warn!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); + } } _ => { // a response is not expected, drop the stream for all other requests } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index be2fcf2c82..d6b8bebda8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -18,7 +18,7 @@ use types::{BeaconBlock, EthSpec, Hash256, Slot}; /// to do so. //TODO: Make this dynamic based on peer's bandwidth //TODO: This is lower due to current thread design. Modify once rebuilt. -const BLOCKS_PER_BATCH: u64 = 25; +const BLOCKS_PER_BATCH: u64 = 50; /// The number of times to retry a batch before the chain is considered failed and removed. const MAX_BATCH_RETRIES: u8 = 5; @@ -149,6 +149,7 @@ impl SyncingChain { } else { // A stream termination has been sent. This batch has ended. Process a completed batch. let batch = self.pending_batches.remove(&request_id)?; + trace!(log, "Batch downloaded"; "id" => batch.id, "request_id" => request_id); Some(self.process_completed_batch(chain.clone(), network, batch, log)) } } @@ -429,7 +430,7 @@ impl SyncingChain { if let Some(batch) = self.get_next_batch(peer_id) { debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); // send the batch - self.send_batch(network, batch); + self.send_batch(network, batch, log); return true; } } @@ -449,11 +450,17 @@ impl SyncingChain { } /// Requests the provided batch from the provided peer. - fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch: Batch, + log: &slog::Logger, + ) { let request = batch.to_blocks_by_range_request(); if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request) { // add the batch to pending list + trace!(log, "Batch requested"; "id" => batch.id, "request_id" => request_id); self.pending_batches.insert(request_id, batch); } } @@ -504,7 +511,7 @@ impl SyncingChain { log: &slog::Logger, ) -> Option { if let Some(batch) = self.pending_batches.remove(&request_id) { - warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id)); + warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id), "request_id" => request_id); Some(self.failed_batch(network, batch, log)) } else { @@ -542,7 +549,7 @@ impl SyncingChain { batch.current_peer = new_peer.clone(); debug!(log, "Re-Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); - self.send_batch(network, batch); + self.send_batch(network, batch, log); ProcessingResult::KeepChain } }