diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 14be9acdc3..a685e33248 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -19,6 +19,8 @@ use types::Hash256; /// Timeout for RPC requests. const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); +/// Timeout before banning a peer for non-identification. +const HELLO_TIMEOUT: Duration = Duration::from_secs(30); /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -28,22 +30,17 @@ pub struct MessageHandler { sync: SimpleSync, /// The network channel to relay messages to the Network service. network_send: crossbeam_channel::Sender, - /// A mapping of peers we have sent an RPC request to. - requests: HashMap>, + /// A mapping of peers and the RPC id we have sent an RPC request to. + requests: HashMap<(PeerId, u64), Instant>, + /// A mapping of HELLO requests we have sent. We drop/ban peers if they do not response + /// within the timeout + hello_requests: HashMap, /// A counter of request id for each peer. request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } -/// RPC request information -pub struct RPCRequestInfo { - /// The id of the request - id: u64, - /// The time the request was sent, to check ttl. - request_time: Instant, -} - /// Types of messages the handler can receive. #[derive(Debug, Clone)] pub enum HandlerMessage { @@ -79,7 +76,9 @@ impl MessageHandler { sync, network_send, requests: HashMap::new(), + hello_requests: HashMap::new(), request_ids: HashMap::new(), + log: log.clone(), }; @@ -140,15 +139,13 @@ impl MessageHandler { fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {} fn handle_hello_response(&mut self, peer_id: PeerId, id: u64, response: HelloMessage) { - /* - // if response id is not in our list, ignore (likely RPC timeout) - match self.requests.get(peer_id) { - None => return; - Some(rpc_info) => { - if rpc_info.con - - */ + if self.hello_requests.remove(&peer_id).is_none() { + // if response id is not in our list, ignore (likely RPC timeout) + return; + } + debug!(self.log, "Hello response received from peer: {:?}", peer_id); + // validate peer - decide whether to drop/ban or add to sync } /// Sends a HELLO RPC request to a newly connected peer. @@ -161,17 +158,12 @@ impl MessageHandler { *borrowed_id += 1; id }; - // register RPC request - { - let requests = self - .requests - .entry(peer_id.clone()) - .or_insert_with(|| vec![]); - requests.push(RPCRequestInfo { - id: id.clone(), - request_time: Instant::now(), - }); - } + // register RPC Hello request + self.requests.insert((peer_id.clone(), id), Instant::now()); + debug!( + self.log, + "Hello request registered with peer: {:?}", peer_id + ); // build the rpc request let rpc_event = RPCEvent::Request {