diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index be8fa21f89..782d2129ef 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -9,7 +9,7 @@ use eth2_libp2p::{ }; use futures::future::Future; use futures::stream::Stream; -use slog::{debug, trace, warn}; +use slog::{debug, o, trace, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; @@ -51,7 +51,8 @@ impl MessageHandler { executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result> { - trace!(log, "Service starting"); + let message_handler_log = log.new(o!("Service"=> "Message Handler")); + trace!(message_handler_log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); @@ -63,7 +64,7 @@ impl MessageHandler { let mut handler = MessageHandler { network_send, message_processor, - log: log.clone(), + log: message_handler_log, }; // spawn handler task and move the message handler instance into the spawned thread diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index f546306157..1357b54951 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -34,13 +34,8 @@ impl Service { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread - let message_handler_log = log.new(o!("Service" => "MessageHandler")); - let message_handler_send = MessageHandler::spawn( - beacon_chain, - network_send.clone(), - executor, - message_handler_log, - )?; + let message_handler_send = + MessageHandler::spawn(beacon_chain, network_send.clone(), executor, log.clone())?; let network_log = log.new(o!("Service" => "Network")); // launch libp2p service diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 12bef95fa5..171d0fdf0b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -251,7 +251,7 @@ pub fn spawn( // create an instance of the SyncManager let sync_manager = SyncManager { chain: beacon_chain, - state: ManagerState::Regular, + state: ManagerState::Stalled, input_channel: sync_recv, network, import_queue: HashMap::new(), @@ -510,7 +510,7 @@ impl SyncManager { &mut self, peer_id: PeerId, request_id: RequestId, - blocks: Vec>, + mut blocks: Vec>, ) { // find the request let parent_request = match self @@ -545,6 +545,11 @@ impl SyncManager { return; } + // add the block to response + parent_request + .downloaded_blocks + .push(blocks.pop().expect("must exist")); + // queue for processing parent_request.state = BlockRequestsState::ReadyToProcess; } @@ -594,7 +599,6 @@ impl SyncManager { "peer" => format!("{:?}", peer_id), ); self.full_peers.insert(peer_id); - self.update_state(); } /* Processing State Functions */ @@ -1077,7 +1081,6 @@ impl Future for SyncManager { Ok(Async::Ready(Some(message))) => match message { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); - dbg!("add peer"); } SyncMessage::BeaconBlocksResponse { peer_id, @@ -1118,17 +1121,13 @@ impl Future for SyncManager { //need to be called. let mut re_run = false; - dbg!(self.import_queue.len()); // only process batch requests if there are any if !self.import_queue.is_empty() { // process potential block requests self.process_potential_block_requests(); - dbg!(self.import_queue.len()); // process any complete long-range batches re_run = re_run || self.process_complete_batches(); - dbg!(self.import_queue.len()); - dbg!(&self.state); } // only process parent objects if we are in regular sync @@ -1140,9 +1139,6 @@ impl Future for SyncManager { re_run = re_run || self.process_complete_parent_requests(); } - dbg!(self.import_queue.len()); - dbg!(&self.state); - // Shutdown the thread if the chain has termined if let None = self.chain.upgrade() { return Ok(Async::Ready(())); @@ -1152,8 +1148,6 @@ impl Future for SyncManager { break; } } - dbg!(self.import_queue.len()); - dbg!(&self.state); // update the state of the manager self.update_state(); diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index d8b5f2dbf3..c54c481c73 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -352,7 +352,7 @@ impl MessageProcessor { "count" => beacon_blocks.len(), ); - self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { + self.send_to_sync(SyncMessage::BeaconBlocksResponse { peer_id, request_id, beacon_blocks, @@ -368,12 +368,12 @@ impl MessageProcessor { ) { debug!( self.log, - "BeaconBlocksResponse"; + "RecentBeaconBlocksResponse"; "peer" => format!("{:?}", peer_id), "count" => beacon_blocks.len(), ); - self.send_to_sync(SyncMessage::BeaconBlocksResponse { + self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { peer_id, request_id, beacon_blocks, diff --git a/tests/ef_tests/eth2.0-spec-tests b/tests/ef_tests/eth2.0-spec-tests index aaa1673f50..ae6dd9011d 160000 --- a/tests/ef_tests/eth2.0-spec-tests +++ b/tests/ef_tests/eth2.0-spec-tests @@ -1 +1 @@ -Subproject commit aaa1673f508103e11304833e0456e4149f880065 +Subproject commit ae6dd9011df05fab8c7e651c09cf9c940973bf81