From 0d56df474a6df70353a89970329f3c08068eef23 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sun, 25 Aug 2019 00:27:47 +1000 Subject: [PATCH] Main batch sync debugging --- beacon_node/client/src/lib.rs | 6 +- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 20 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 4 +- beacon_node/network/src/message_handler.rs | 6 +- beacon_node/network/src/sync/manager.rs | 240 +++++++++---------- beacon_node/network/src/sync/simple_sync.rs | 99 +++++--- 6 files changed, 219 insertions(+), 156 deletions(-) diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 4b64c10705..7e6449a98d 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -41,7 +41,7 @@ pub struct Client { /// Signal to terminate the slot timer. pub slot_timer_exit_signal: Option, /// Signal to terminate the API - pub api_exit_signal: Option, + // pub api_exit_signal: Option, /// The clients logger. log: slog::Logger, /// Marker to pin the beacon chain generics. @@ -134,6 +134,7 @@ where None }; + /* // Start the `rest_api` service let api_exit_signal = if client_config.rest_api.enabled { match rest_api::start_server( @@ -151,6 +152,7 @@ where } else { None }; + */ let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { @@ -184,7 +186,7 @@ where http_exit_signal, rpc_exit_signal, slot_timer_exit_signal: Some(slot_timer_exit_signal), - api_exit_signal, + //api_exit_signal, log, network, phantom: PhantomData, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index f7262118d6..260a00346c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -171,7 +171,25 @@ impl Decoder for SSZOutboundCodec { }, _ => unreachable!("Cannot negotiate an unknown protocol"), }, - Ok(None) => Ok(None), + Ok(None) => { + // the object sent could be a empty. We return the empty object if this is the case + match self.protocol.message_name.as_str() { + "hello" => match self.protocol.version.as_str() { + "1" => Ok(None), // cannot have an empty HELLO message. The stream has terminated unexpectedly + _ => unreachable!("Cannot negotiate an unknown version"), + }, + "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), + "beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + "recent_beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + _ => unreachable!("Cannot negotiate an unknown protocol"), + } + } Err(e) => Err(e), } } diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index a69cd0cda9..07322875ff 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -317,11 +317,11 @@ where RPCEvent::Response(rpc_event.id(), response), ))); } else { - // stream closed early + // stream closed early or nothing was sent return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error( rpc_event.id(), - RPCError::Custom("Stream Closed Early".into()), + RPCError::Custom("Stream closed early. Empty response".into()), ), ))); } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 7a1a4ad317..c14fc970d7 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,8 +1,7 @@ use crate::error; -use crate::service::{NetworkMessage, OutgoingMessage}; +use crate::service::NetworkMessage; use crate::sync::SimpleSync; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::methods::*; use eth2_libp2p::{ behaviour::PubsubMessage, rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId}, @@ -304,6 +303,9 @@ impl MessageHandler { &self, beacon_blocks: &[u8], ) -> Result>, DecodeError> { + if beacon_blocks.is_empty() { + return Ok(Vec::new()); + } //TODO: Implement faster block verification before decoding entirely Vec::from_ssz_bytes(&beacon_blocks) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f5c6694557..b81da0991f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -13,14 +13,12 @@ const MAX_BLOCKS_PER_REQUEST: u64 = 10; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: usize = 10; - const PARENT_FAIL_TOLERANCE: usize = 3; const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; #[derive(PartialEq)] enum BlockRequestsState { - QueuedForward, - QueuedBackward, + Queued, Pending(RequestId), Complete, Failed, @@ -31,6 +29,10 @@ struct BlockRequests { target_head_root: Hash256, downloaded_blocks: Vec>, state: BlockRequestsState, + /// Specifies whether the current state is syncing forwards or backwards. + forward_sync: bool, + /// The current `start_slot` of the batched block request. + current_start_slot: Slot, } struct ParentRequests { @@ -43,25 +45,13 @@ struct ParentRequests { impl BlockRequests { // gets the start slot for next batch // last block slot downloaded plus 1 - fn next_start_slot(&self) -> Option { - if !self.downloaded_blocks.is_empty() { - match self.state { - BlockRequestsState::QueuedForward => { - let last_element_index = self.downloaded_blocks.len() - 1; - Some(self.downloaded_blocks[last_element_index].slot.add(1)) - } - BlockRequestsState::QueuedBackward => { - let earliest_known_slot = self.downloaded_blocks[0].slot; - Some(earliest_known_slot.add(1).sub(MAX_BLOCKS_PER_REQUEST)) - } - _ => { - // pending/complete/failed - None - } - } + fn update_start_slot(&mut self) { + if self.forward_sync { + self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); } else { - None + self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); } + self.state = BlockRequestsState::Queued; } } @@ -117,7 +107,7 @@ impl ImportManager { let local = PeerSyncInfo::from(&self.chain); - // If a peer is within SLOT_IMPORT_TOLERANCE from out head slot, ignore a batch sync + // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { trace!(self.log, "Ignoring full sync with peer"; "peer" => format!("{:?}", peer_id), @@ -139,7 +129,9 @@ impl ImportManager { target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_root: remote.head_root, downloaded_blocks: Vec::new(), - state: BlockRequestsState::QueuedForward, + state: BlockRequestsState::Queued, + forward_sync: true, + current_start_slot: self.chain.best_slot(), }; self.import_queue.insert(peer_id, block_requests); } @@ -165,8 +157,6 @@ impl ImportManager { } }; - // The response should contain at least one block. - // // If we are syncing up to a target head block, at least the target head block should be // returned. If we are syncing back to our last finalized block the request should return // at least the last block we received (last known block). In diagram form: @@ -176,33 +166,30 @@ impl ImportManager { // ^finalized slot ^ requested start slot ^ last known block ^ remote head if blocks.is_empty() { - warn!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); - block_requests.state = BlockRequestsState::Failed; + debug!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); + block_requests.update_start_slot(); return; } - // Add the newly downloaded blocks to the current list of downloaded blocks. This also - // determines if we are syncing forward or backward. - let syncing_forwards = { - if block_requests.downloaded_blocks.is_empty() { - block_requests.downloaded_blocks.append(&mut blocks); - true - } else if block_requests.downloaded_blocks[0].slot < blocks[0].slot { - // syncing forwards - // verify the peer hasn't sent overlapping blocks - ensuring the strictly - // increasing blocks in a batch will be verified during the processing - if block_requests.next_start_slot() > Some(blocks[0].slot) { - warn!(self.log, "BeaconBlocks response returned duplicate blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_start_slot()); - block_requests.state = BlockRequestsState::Failed; - return; - } - - block_requests.downloaded_blocks.append(&mut blocks); - true - } else { - false - } - }; + // verify the range of received blocks + // Note that the order of blocks is verified in block processing + let last_sent_slot = blocks[blocks.len() - 1].slot; + if block_requests.current_start_slot > blocks[0].slot + || block_requests + .current_start_slot + .add(MAX_BLOCKS_PER_REQUEST) + < last_sent_slot + { + //TODO: Downvote peer - add a reason to failed + dbg!(&blocks); + warn!(self.log, "BeaconBlocks response returned out of range blocks"; + "request_id" => request_id, + "response_initial_slot" => blocks[0].slot, + "requested_initial_slot" => block_requests.current_start_slot); + // consider this sync failed + block_requests.state = BlockRequestsState::Failed; + return; + } // Determine if more blocks need to be downloaded. There are a few cases: // - We have downloaded a batch from our head_slot, which has not reached the remotes head @@ -216,61 +203,60 @@ impl ImportManager { // chain. If so, process the blocks, if not, request more blocks all the way up to // our last finalized slot. - if syncing_forwards { - // does the batch contain the target_head_slot - let last_element_index = block_requests.downloaded_blocks.len() - 1; - if block_requests.downloaded_blocks[last_element_index].slot - >= block_requests.target_head_slot - { - // if the batch is on our chain, this is complete and we can then process. - // Otherwise start backwards syncing until we reach a common chain. - let earliest_slot = block_requests.downloaded_blocks[0].slot; - //TODO: Decide which is faster. Reading block from db and comparing or calculating - //the hash tree root and comparing. - if Some(block_requests.downloaded_blocks[0].canonical_root()) - == root_at_slot(self.chain, earliest_slot) - { - block_requests.state = BlockRequestsState::Complete; - return; - } - - // not on the same chain, request blocks backwards - let state = &self.chain.head().beacon_state; - let local_finalized_slot = state - .finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - - // check that the request hasn't failed by having no common chain - if local_finalized_slot >= block_requests.downloaded_blocks[0].slot { - warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); - block_requests.state = BlockRequestsState::Failed; - return; - } - - // Start a backwards sync by requesting earlier blocks - // There can be duplication in downloaded blocks here if there are a large number - // of skip slots. In all cases we at least re-download the earliest known block. - // It is unlikely that a backwards sync in required, so we accept this duplication - // for now. - block_requests.state = BlockRequestsState::QueuedBackward; - } else { - // batch doesn't contain the head slot, request the next batch - block_requests.state = BlockRequestsState::QueuedForward; - } + if block_requests.forward_sync { + // append blocks if syncing forward + block_requests.downloaded_blocks.append(&mut blocks); } else { - // syncing backwards + // prepend blocks if syncing backwards + block_requests.downloaded_blocks.splice(..0, blocks); + } + + // does the batch contain the target_head_slot + let last_element_index = block_requests.downloaded_blocks.len() - 1; + if block_requests.downloaded_blocks[last_element_index].slot + >= block_requests.target_head_slot + || !block_requests.forward_sync + { // if the batch is on our chain, this is complete and we can then process. - // Otherwise continue backwards + // Otherwise start backwards syncing until we reach a common chain. let earliest_slot = block_requests.downloaded_blocks[0].slot; + //TODO: Decide which is faster. Reading block from db and comparing or calculating + //the hash tree root and comparing. if Some(block_requests.downloaded_blocks[0].canonical_root()) - == root_at_slot(self.chain, earliest_slot) + == root_at_slot(&self.chain, earliest_slot) { block_requests.state = BlockRequestsState::Complete; return; } - block_requests.state = BlockRequestsState::QueuedBackward; + + // not on the same chain, request blocks backwards + let state = &self.chain.head().beacon_state; + let local_finalized_slot = state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // check that the request hasn't failed by having no common chain + if local_finalized_slot >= block_requests.current_start_slot { + warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); + block_requests.state = BlockRequestsState::Failed; + return; + } + + // if this is a forward sync, then we have reached the head without a common chain + // and we need to start syncing backwards. + if block_requests.forward_sync { + // Start a backwards sync by requesting earlier blocks + block_requests.forward_sync = false; + block_requests.current_start_slot = std::cmp::min( + self.chain.best_slot(), + block_requests.downloaded_blocks[0].slot, + ); + } } + + // update the start slot and re-queue the batch + block_requests.update_start_slot(); } pub fn recent_blocks_response( @@ -296,7 +282,7 @@ impl ImportManager { // if an empty response is given, the peer didn't have the requested block, try again if blocks.is_empty() { parent_request.failed_attempts += 1; - parent_request.state = BlockRequestsState::QueuedForward; + parent_request.state = BlockRequestsState::Queued; parent_request.last_submitted_peer = peer_id; return; } @@ -316,7 +302,7 @@ impl ImportManager { parent_request.state = BlockRequestsState::Complete; } - pub fn inject_error(peer_id: PeerId, id: RequestId) { + pub fn _inject_error(_peer_id: PeerId, _id: RequestId) { //TODO: Remove block state from pending } @@ -358,13 +344,13 @@ impl ImportManager { downloaded_blocks: vec![block], failed_attempts: 0, last_submitted_peer: peer_id, - state: BlockRequestsState::QueuedBackward, + state: BlockRequestsState::Queued, }; self.parent_queue.push(req); } - pub fn poll(&mut self) -> ImportManagerOutcome { + pub(crate) fn poll(&mut self) -> ImportManagerOutcome { loop { // update the state of the manager self.update_state(); @@ -385,12 +371,11 @@ impl ImportManager { } // process any complete parent lookups - if let (re_run, outcome) = self.process_complete_parent_requests() { - if let Some(outcome) = outcome { - return outcome; - } else if !re_run { - break; - } + let (re_run, outcome) = self.process_complete_parent_requests(); + if let Some(outcome) = outcome { + return outcome; + } else if !re_run { + break; } } @@ -423,9 +408,10 @@ impl ImportManager { // If any in queued state we submit a request. // remove any failed batches + let debug_log = &self.log; self.import_queue.retain(|peer_id, block_request| { if let BlockRequestsState::Failed = block_request.state { - debug!(self.log, "Block import from peer failed"; + debug!(debug_log, "Block import from peer failed"; "peer_id" => format!("{:?}", peer_id), "downloaded_blocks" => block_request.downloaded_blocks.len() ); @@ -436,20 +422,18 @@ impl ImportManager { }); // process queued block requests - for (peer_id, block_requests) in self.import_queue.iter_mut().find(|(_peer_id, req)| { - req.state == BlockRequestsState::QueuedForward - || req.state == BlockRequestsState::QueuedBackward - }) { + for (peer_id, block_requests) in self + .import_queue + .iter_mut() + .find(|(_peer_id, req)| req.state == BlockRequestsState::Queued) + { let request_id = self.current_req_id; block_requests.state = BlockRequestsState::Pending(request_id); self.current_req_id += 1; let request = BeaconBlocksRequest { head_block_root: block_requests.target_head_root, - start_slot: block_requests - .next_start_slot() - .unwrap_or_else(|| self.chain.best_slot()) - .as_u64(), + start_slot: block_requests.current_start_slot.as_u64(), count: MAX_BLOCKS_PER_REQUEST, step: 0, }; @@ -504,9 +488,10 @@ impl ImportManager { fn process_parent_requests(&mut self) -> Option { // remove any failed requests + let debug_log = &self.log; self.parent_queue.retain(|parent_request| { if parent_request.state == BlockRequestsState::Failed { - debug!(self.log, "Parent import failed"; + debug!(debug_log, "Parent import failed"; "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), "ancestors_found" => parent_request.downloaded_blocks.len() ); @@ -524,9 +509,15 @@ impl ImportManager { // check if parents need to be searched for for parent_request in self.parent_queue.iter_mut() { if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { - parent_request.state == BlockRequestsState::Failed; + parent_request.state = BlockRequestsState::Failed; continue; - } else if parent_request.state == BlockRequestsState::QueuedForward { + } else if parent_request.state == BlockRequestsState::Queued { + // check the depth isn't too large + if parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { + parent_request.state = BlockRequestsState::Failed; + continue; + } + parent_request.state = BlockRequestsState::Pending(self.current_req_id); self.current_req_id += 1; let last_element_index = parent_request.downloaded_blocks.len() - 1; @@ -564,7 +555,7 @@ impl ImportManager { if block_hash != expected_hash { // remove the head block let _ = completed_request.downloaded_blocks.pop(); - completed_request.state = BlockRequestsState::QueuedForward; + completed_request.state = BlockRequestsState::Queued; //TODO: Potentially downvote the peer let peer = completed_request.last_submitted_peer.clone(); debug!(self.log, "Peer sent invalid parent. Ignoring"; @@ -585,7 +576,7 @@ impl ImportManager { Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { // need to keep looking for parents completed_request.downloaded_blocks.push(block); - completed_request.state == BlockRequestsState::QueuedForward; + completed_request.state = BlockRequestsState::Queued; re_run = true; break; } @@ -598,7 +589,7 @@ impl ImportManager { "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", completed_request.last_submitted_peer), ); - completed_request.state == BlockRequestsState::QueuedForward; + completed_request.state = BlockRequestsState::Queued; re_run = true; return ( re_run, @@ -613,7 +604,7 @@ impl ImportManager { self.log, "Parent processing error"; "error" => format!("{:?}", e) ); - completed_request.state == BlockRequestsState::QueuedForward; + completed_request.state = BlockRequestsState::Queued; re_run = true; return ( re_run, @@ -691,6 +682,13 @@ impl ImportManager { ); } } + BlockProcessingOutcome::FinalizedSlot => { + trace!( + self.log, "Finalized or earlier block processed"; + "outcome" => format!("{:?}", outcome), + ); + // block reached our finalized slot or was earlier, move to the next block + } _ => { trace!( self.log, "InvalidBlock"; @@ -717,7 +715,7 @@ impl ImportManager { } fn root_at_slot( - chain: Arc>, + chain: &Arc>, target_slot: Slot, ) -> Option { chain diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index deadf214d6..924b2de9b6 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -2,24 +2,22 @@ use super::manager::{ImportManager, ImportManagerOutcome}; use crate::service::{NetworkMessage, OutgoingMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, error, info, o, trace, warn}; +use slog::{debug, info, o, trace, warn}; use ssz::Encode; -use std::collections::HashMap; +use std::ops::Sub; use std::sync::Arc; -use std::time::Duration; use store::Store; use tokio::sync::mpsc; -use types::{ - Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, -}; +use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; +/// The number of slots behind our head that we still treat a peer as a fully synced peer. +const FULL_PEER_TOLERANCE: u64 = 10; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; @@ -54,8 +52,8 @@ impl From<&Arc>> for PeerSyncInfo { /// The current syncing state. #[derive(PartialEq)] pub enum SyncState { - Idle, - Downloading, + _Idle, + _Downloading, _Stopped, } @@ -97,7 +95,7 @@ impl SimpleSync { /// Sends a `Hello` message to the peer. pub fn on_connect(&mut self, peer_id: PeerId) { self.network - .send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); + .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&self.chain))); } /// Handle a `Hello` request. @@ -193,8 +191,16 @@ impl SimpleSync { { // If the node's best-block is already known to us and they are close to our current // head, treat them as a fully sync'd peer. - self.manager.add_full_peer(peer_id); - self.process_sync(); + if self.chain.best_slot().sub(remote.head_slot).as_u64() < FULL_PEER_TOLERANCE { + self.manager.add_full_peer(peer_id); + self.process_sync(); + } else { + debug!( + self.log, + "Out of sync peer connected"; + "peer" => format!("{:?}", peer_id), + ); + } } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // @@ -222,8 +228,11 @@ impl SimpleSync { "method" => "HELLO", "peer" => format!("{:?}", peer_id) ); - self.network - .send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); + self.network.send_rpc_request( + None, + peer_id, + RPCRequest::Hello(hello_message(&self.chain)), + ); } ImportManagerOutcome::RequestBlocks { peer_id, @@ -238,8 +247,11 @@ impl SimpleSync { "count" => request.count, "peer" => format!("{:?}", peer_id) ); - self.network - .send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(request)); + self.network.send_rpc_request( + Some(request_id), + peer_id.clone(), + RPCRequest::BeaconBlocks(request), + ); } ImportManagerOutcome::RecentRequest(peer_id, req) => { trace!( @@ -249,8 +261,11 @@ impl SimpleSync { "count" => req.block_roots.len(), "peer" => format!("{:?}", peer_id) ); - self.network - .send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req)); + self.network.send_rpc_request( + None, + peer_id.clone(), + RPCRequest::RecentBeaconBlocks(req), + ); } ImportManagerOutcome::DownvotePeer(peer_id) => { trace!( @@ -270,6 +285,7 @@ impl SimpleSync { } } + //TODO: Move to beacon chain fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain .rev_iter_block_roots() @@ -333,36 +349,58 @@ impl SimpleSync { "start_slot" => req.start_slot, ); + //TODO: Optimize this + // Currently for skipped slots, the blocks returned could be less than the requested range. + // In the current implementation we read from the db then filter out out-of-range blocks. + // Improving the db schema to prevent this would be ideal. + let mut blocks: Vec> = self .chain .rev_iter_block_roots() .filter(|(_root, slot)| { - req.start_slot <= slot.as_u64() && req.start_slot + req.count >= slot.as_u64() + req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64() }) .take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) .filter_map(|(root, _slot)| { if let Ok(Some(block)) = self.chain.store.get::>(&root) { Some(block) } else { - debug!( + warn!( self.log, - "Peer requested unknown block"; - "peer" => format!("{:?}", peer_id), + "Block in the chain is not in the store"; "request_root" => format!("{:}", root), ); None } }) + .filter(|block| block.slot >= req.start_slot) .collect(); + // TODO: Again find a more elegant way to include genesis if needed + // if the genesis is requested, add it in + if req.start_slot == 0 { + if let Ok(Some(genesis)) = self + .chain + .store + .get::>(&self.chain.genesis_block_root) + { + blocks.push(genesis); + } else { + warn!( + self.log, + "Requested genesis, which is not in the chain store"; + ); + } + } + blocks.reverse(); blocks.dedup_by_key(|brs| brs.slot); if blocks.len() as u64 != req.count { debug!( self.log, - "BeaconBlocksRequest"; + "BeaconBlocksRequest response"; "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot, @@ -498,14 +536,19 @@ impl NetworkContext { } pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason)) + self.send_rpc_request(None, peer_id, RPCRequest::Goodbye(reason)) // TODO: disconnect peers. } - pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { - // Note: There is currently no use of keeping track of requests. However the functionality - // is left here for future revisions. - self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); + pub fn send_rpc_request( + &mut self, + request_id: Option, + peer_id: PeerId, + rpc_request: RPCRequest, + ) { + // use 0 as the default request id, when an ID is not required. + let request_id = request_id.unwrap_or_else(|| 0); + self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)); } //TODO: Handle Error responses