From 6b39c693af28675c488202ad8def2980fb3c772d Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 24 Mar 2019 12:50:23 +1100 Subject: [PATCH] Extend syncing --- beacon_node/network/src/message_handler.rs | 49 ++--- beacon_node/network/src/sync/simple_sync.rs | 158 ++++++++++++++-- beacon_node/network/tests/tests.rs | 196 +++++++++++++++++++- 3 files changed, 364 insertions(+), 39 deletions(-) diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 2a84616e56..58ba0171db 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -113,10 +113,24 @@ impl MessageHandler { match request { RPCRequest::Hello(hello_message) => { self.sync - .on_hello(peer_id, hello_message, &mut self.network_context) + .on_hello_request(peer_id, hello_message, &mut self.network_context) } + RPCRequest::BeaconBlockRoots(request) => { + self.sync + .on_beacon_block_roots_request(peer_id, request, &mut self.network_context) + } + RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request( + peer_id, + request, + &mut self.network_context, + ), + RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request( + peer_id, + request, + &mut self.network_context, + ), // TODO: Handle all requests - _ => {} + _ => panic!("Unknown request: {:?}", request), } } @@ -133,48 +147,41 @@ impl MessageHandler { debug!(self.log, "Unrecognized response from peer: {:?}", peer_id); return; } - match response { + let response_str = match response { RPCResponse::Hello(hello_message) => { - debug!(self.log, "Hello response received from peer: {:?}", peer_id); self.sync - .on_hello(peer_id, hello_message, &mut self.network_context); + .on_hello_response(peer_id, hello_message, &mut self.network_context); + "Hello" } RPCResponse::BeaconBlockRoots(response) => { - debug!( - self.log, - "BeaconBlockRoots response received"; "peer" => format!("{:?}", peer_id) - ); self.sync.on_beacon_block_roots_response( peer_id, response, &mut self.network_context, - ) + ); + "BeaconBlockRoots" } RPCResponse::BeaconBlockHeaders(response) => { - debug!( - self.log, - "BeaconBlockHeaders response received"; "peer" => format!("{:?}", peer_id) - ); self.sync.on_beacon_block_headers_response( peer_id, response, &mut self.network_context, - ) + ); + "BeaconBlockHeaders" } RPCResponse::BeaconBlockBodies(response) => { - debug!( - self.log, - "BeaconBlockBodies response received"; "peer" => format!("{:?}", peer_id) - ); self.sync.on_beacon_block_bodies_response( peer_id, response, &mut self.network_context, - ) + ); + "BeaconBlockBodies" } // TODO: Handle all responses _ => panic!("Unknown response: {:?}", response), - } + }; + + debug!(self.log, "RPCResponse"; "type" => response_str); } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index b190f787fe..4726419d50 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -126,14 +126,31 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { + // Say hello back. network.send_rpc_response( peer_id.clone(), RPCResponse::Hello(self.chain.hello_message()), ); - self.on_hello(peer_id, hello, network); + + self.process_hello(peer_id, hello, network); } - pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) { + pub fn on_hello_response( + &mut self, + peer_id: PeerId, + hello: HelloMessage, + network: &mut NetworkContext, + ) { + // Process the hello message, without sending back another hello. + self.process_hello(peer_id, hello, network); + } + + fn process_hello( + &mut self, + peer_id: PeerId, + hello: HelloMessage, + network: &mut NetworkContext, + ) { let spec = self.chain.get_spec(); let remote = PeerSyncInfo::from(hello); @@ -142,7 +159,7 @@ impl SimpleSync { // network id must match if remote_status != PeerStatus::OnDifferentChain { - debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); + info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); self.known_peers.insert(peer_id.clone(), remote); } @@ -183,6 +200,44 @@ impl SimpleSync { } } + pub fn on_beacon_block_roots_request( + &mut self, + peer_id: PeerId, + request: BeaconBlockRootsRequest, + network: &mut NetworkContext, + ) { + let roots = match self + .chain + .get_block_roots(request.start_slot, request.count as usize, 0) + { + Ok(roots) => roots, + Err(e) => { + // TODO: return RPC error. + warn!( + self.log, + "RPCRequest"; "peer" => format!("{:?}", peer_id), + "request" => "BeaconBlockRoots", + "error" => format!("{:?}", e) + ); + return; + } + }; + + let roots = roots + .iter() + .enumerate() + .map(|(i, &block_root)| BlockRootSlot { + slot: request.start_slot + Slot::from(i), + block_root, + }) + .collect(); + + network.send_rpc_response( + peer_id, + RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }), + ) + } + pub fn on_beacon_block_roots_response( &mut self, peer_id: PeerId, @@ -219,6 +274,36 @@ impl SimpleSync { } } + pub fn on_beacon_block_headers_request( + &mut self, + peer_id: PeerId, + request: BeaconBlockHeadersRequest, + network: &mut NetworkContext, + ) { + let headers = match self.chain.get_block_headers( + request.start_slot, + request.max_headers as usize, + request.skip_slots as usize, + ) { + Ok(headers) => headers, + Err(e) => { + // TODO: return RPC error. + warn!( + self.log, + "RPCRequest"; "peer" => format!("{:?}", peer_id), + "request" => "BeaconBlockHeaders", + "error" => format!("{:?}", e) + ); + return; + } + }; + + network.send_rpc_response( + peer_id, + RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }), + ) + } + pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, @@ -237,9 +322,33 @@ impl SimpleSync { .import_queue .enqueue_headers(response.headers, peer_id.clone()); - if !block_roots.is_empty() { - self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); - } + self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + } + + pub fn on_beacon_block_bodies_request( + &mut self, + peer_id: PeerId, + request: BeaconBlockBodiesRequest, + network: &mut NetworkContext, + ) { + let block_bodies = match self.chain.get_block_bodies(&request.block_roots) { + Ok(bodies) => bodies, + Err(e) => { + // TODO: return RPC error. + warn!( + self.log, + "RPCRequest"; "peer" => format!("{:?}", peer_id), + "request" => "BeaconBlockBodies", + "error" => format!("{:?}", e) + ); + return; + } + }; + + network.send_rpc_response( + peer_id, + RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }), + ) } pub fn on_beacon_block_bodies_response( @@ -250,6 +359,11 @@ impl SimpleSync { ) { self.import_queue .enqueue_bodies(response.block_bodies, peer_id.clone()); + + // Clear out old entries + self.import_queue.remove_stale(); + + // Import blocks, if possible. self.process_import_queue(network); } @@ -268,10 +382,14 @@ impl SimpleSync { }) .collect(); + if !blocks.is_empty() { + info!(self.log, "Processing blocks"; "count" => blocks.len()); + } + // Sort the blocks to be in ascending slot order. blocks.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); - let mut imported_keys = vec![]; + let mut keys_to_delete = vec![]; for (key, block, sender) in blocks { match self.chain.process_block(block) { @@ -279,8 +397,10 @@ impl SimpleSync { if outcome.is_invalid() { warn!(self.log, "Invalid block: {:?}", outcome); network.disconnect(sender); + keys_to_delete.push(key) } else { - imported_keys.push(key) + // TODO: don't delete if was not invalid but not successfully processed. + keys_to_delete.push(key) } } Err(e) => { @@ -289,11 +409,9 @@ impl SimpleSync { } } - println!("imported_keys.len: {:?}", imported_keys.len()); - - if !imported_keys.is_empty() { - info!(self.log, "Imported {} blocks", imported_keys.len()); - for key in imported_keys { + if !keys_to_delete.is_empty() { + info!(self.log, "Processed {} blocks", keys_to_delete.len()); + for key in keys_to_delete { self.import_queue.partials.remove(&key); } } @@ -313,7 +431,10 @@ impl SimpleSync { debug!( self.log, - "Requesting {} block roots from {:?}.", request.count, &peer_id + "RPCRequest"; + "type" => "BeaconBlockRoots", + "count" => request.count, + "peer" => format!("{:?}", peer_id) ); // TODO: handle count > max count. @@ -328,7 +449,10 @@ impl SimpleSync { ) { debug!( self.log, - "Requesting {} headers from {:?}.", request.max_headers, &peer_id + "RPCRequest"; + "type" => "BeaconBlockHeaders", + "max_headers" => request.max_headers, + "peer" => format!("{:?}", peer_id) ); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(request)); @@ -377,6 +501,10 @@ impl ImportQueue { } } + /// Flushes all stale entries from the queue. + /// + /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the + /// past. pub fn remove_stale(&mut self) { let keys: Vec = self .partials diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 57587717bd..b951d7d2a2 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -17,6 +17,7 @@ pub struct SyncNode { pub id: usize, sender: Sender, receiver: Receiver, + peer_id: PeerId, harness: BeaconChainHarness, } @@ -43,6 +44,7 @@ impl SyncNode { id, sender: message_handler_sender, receiver: network_receiver, + peer_id: PeerId::random(), harness, } } @@ -63,6 +65,138 @@ impl SyncNode { self.harness.beacon_chain.hello_message() } + pub fn connect_to(&mut self, node: &SyncNode) { + let message = HandlerMessage::PeerDialed(self.peer_id.clone()); + node.send(message); + } + + /// Reads the receive queue from one node and passes the message to the other. Also returns a + /// copy of the message. + /// + /// self -----> node + /// | + /// us + /// + /// Named after the unix `tee` command. + fn tee(&mut self, node: &SyncNode) -> NetworkMessage { + let network_message = self.recv().expect("Timeout on tee"); + + let handler_message = match network_message.clone() { + NetworkMessage::Send(peer_id, OutgoingMessage::RPC(event)) => { + HandlerMessage::RPC(peer_id, event) + } + _ => panic!("tee cannot parse {:?}", network_message), + }; + + node.send(handler_message); + + network_message + } + + fn tee_hello_request(&mut self, node: &SyncNode) -> HelloMessage { + let request = self.tee_rpc_request(node); + + match request { + RPCRequest::Hello(message) => message, + _ => panic!("tee_hello_request got: {:?}", request), + } + } + + fn tee_hello_response(&mut self, node: &SyncNode) -> HelloMessage { + let response = self.tee_rpc_response(node); + + match response { + RPCResponse::Hello(message) => message, + _ => panic!("tee_hello_response got: {:?}", response), + } + } + + fn tee_block_root_request(&mut self, node: &SyncNode) -> BeaconBlockRootsRequest { + let msg = self.tee_rpc_request(node); + + match msg { + RPCRequest::BeaconBlockRoots(data) => data, + _ => panic!("tee_block_root_request got: {:?}", msg), + } + } + + fn tee_block_root_response(&mut self, node: &SyncNode) -> BeaconBlockRootsResponse { + let msg = self.tee_rpc_response(node); + + match msg { + RPCResponse::BeaconBlockRoots(data) => data, + _ => panic!("tee_block_root_response got: {:?}", msg), + } + } + + fn tee_block_header_request(&mut self, node: &SyncNode) -> BeaconBlockHeadersRequest { + let msg = self.tee_rpc_request(node); + + match msg { + RPCRequest::BeaconBlockHeaders(data) => data, + _ => panic!("tee_block_header_request got: {:?}", msg), + } + } + + fn tee_block_header_response(&mut self, node: &SyncNode) -> BeaconBlockHeadersResponse { + let msg = self.tee_rpc_response(node); + + match msg { + RPCResponse::BeaconBlockHeaders(data) => data, + _ => panic!("tee_block_header_response got: {:?}", msg), + } + } + + fn tee_block_body_request(&mut self, node: &SyncNode) -> BeaconBlockBodiesRequest { + let msg = self.tee_rpc_request(node); + + match msg { + RPCRequest::BeaconBlockBodies(data) => data, + _ => panic!("tee_block_body_request got: {:?}", msg), + } + } + + fn tee_block_body_response(&mut self, node: &SyncNode) -> BeaconBlockBodiesResponse { + let msg = self.tee_rpc_response(node); + + match msg { + RPCResponse::BeaconBlockBodies(data) => data, + _ => panic!("tee_block_body_response got: {:?}", msg), + } + } + + fn tee_rpc_request(&mut self, node: &SyncNode) -> RPCRequest { + let network_message = self.tee(node); + + match network_message { + NetworkMessage::Send( + _peer_id, + OutgoingMessage::RPC(RPCEvent::Request { + id: _, + method_id: _, + body, + }), + ) => body, + _ => panic!("tee_rpc_request failed! got {:?}", network_message), + } + } + + fn tee_rpc_response(&mut self, node: &SyncNode) -> RPCResponse { + let network_message = self.tee(node); + + match network_message { + NetworkMessage::Send( + _peer_id, + OutgoingMessage::RPC(RPCEvent::Response { + id: _, + method_id: _, + result, + }), + ) => result, + _ => panic!("tee_rpc_response failed! got {:?}", network_message), + } + } + pub fn get_block_root_request(&self) -> BeaconBlockRootsRequest { let request = self.recv_rpc_request().expect("No block root request"); @@ -181,7 +315,7 @@ impl SyncMaster { let roots = self .harness .beacon_chain - .get_block_roots(request.start_slot, Slot::from(request.count)) + .get_block_roots(request.start_slot, request.count as usize, 0) .expect("Beacon chain did not give block roots") .iter() .enumerate() @@ -203,7 +337,11 @@ impl SyncMaster { let roots = self .harness .beacon_chain - .get_block_roots(request.start_slot, Slot::from(request.max_headers)) + .get_block_roots( + request.start_slot, + request.max_headers as usize, + request.skip_slots as usize, + ) .expect("Beacon chain did not give blocks"); if roots.is_empty() { @@ -312,7 +450,7 @@ pub fn build_blocks(blocks: usize, master: &mut SyncMaster, nodes: &mut Vec