use super::gossip_processor::{GossipProcessor, WorkEvent as GossipWorkEvent}; use crate::service::NetworkMessage; use crate::sync::{PeerSyncInfo, SyncMessage}; use beacon_chain::{ observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock, }; use eth2_libp2p::rpc::*; use eth2_libp2p::{ MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use state_processing::SigVerifiedOp; use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, ChainSpec, Epoch, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; //TODO: Rate limit requests /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. pub struct Processor { /// A reference to the underlying beacon chain. chain: Arc>, /// A channel to the syncing thread. sync_send: mpsc::UnboundedSender>, /// A network context to return and handle RPC requests. network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for consensus gossip messages. gossip_processor_send: mpsc::Sender>, /// The `RPCHandler` logger. log: slog::Logger, } impl Processor { /// Instantiate a `Processor` instance pub fn new( executor: environment::TaskExecutor, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, log: &slog::Logger, ) -> Self { let sync_logger = log.new(o!("service"=> "sync")); // spawn the sync thread let sync_send = crate::sync::manager::spawn( executor.clone(), beacon_chain.clone(), network_globals.clone(), network_send.clone(), sync_logger, ); let gossip_processor_send = GossipProcessor { beacon_chain: beacon_chain.clone(), network_tx: network_send.clone(), sync_tx: sync_send.clone(), network_globals, executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, log: log.clone(), } .spawn_manager(); Processor { chain: beacon_chain, sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), gossip_processor_send, log: log.clone(), } } fn send_to_sync(&mut self, message: SyncMessage) { self.sync_send.send(message).unwrap_or_else(|_| { warn!( self.log, "Could not send message to the sync service"; ) }); } /// Handle a peer disconnect. /// /// Removes the peer from the manager. pub fn on_disconnect(&mut self, peer_id: PeerId) { self.send_to_sync(SyncMessage::Disconnect(peer_id)); } /// An error occurred during an RPC request. The state is maintained by the sync manager, so /// this function notifies the sync manager of the error. pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { // Check if the failed RPC belongs to sync if let RequestId::Sync(id) = request_id { self.send_to_sync(SyncMessage::RPCError(peer_id, id)); } } /// Sends a `Status` message to the peer. /// /// Called when we first connect to a peer, or when the PeerManager determines we need to /// re-status. pub fn send_status(&mut self, peer_id: PeerId) { if let Some(status_message) = status_message(&self.chain) { debug!( self.log, "Sending Status Request"; "peer" => peer_id.to_string(), "fork_digest" => format!("{:?}", status_message.fork_digest), "finalized_root" => format!("{:?}", status_message.finalized_root), "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), "head_root" => format!("{}", status_message.head_root), "head_slot" => format!("{}", status_message.head_slot), ); self.network .send_processor_request(peer_id, Request::Status(status_message)); } } /// Handle a `Status` request. /// /// Processes the `Status` from the remote peer and sends back our `Status`. pub fn on_status_request( &mut self, peer_id: PeerId, request_id: PeerRequestId, status: StatusMessage, ) { debug!( self.log, "Received Status Request"; "peer" => peer_id.to_string(), "fork_digest" => format!("{:?}", status.fork_digest), "finalized_root" => format!("{:?}", status.finalized_root), "finalized_epoch" => format!("{:?}", status.finalized_epoch), "head_root" => format!("{}", status.head_root), "head_slot" => format!("{}", status.head_slot), ); // ignore status responses if we are shutting down if let Some(status_message) = status_message(&self.chain) { // Say status back. self.network.send_response( peer_id.clone(), Response::Status(status_message), request_id, ); } self.process_status(peer_id, status); } /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { debug!( self.log, "Received Status Response"; "peer_id" => peer_id.to_string(), "fork_digest" => format!("{:?}", status.fork_digest), "finalized_root" => format!("{:?}", status.finalized_root), "finalized_epoch" => format!("{:?}", status.finalized_epoch), "head_root" => format!("{}", status.head_root), "head_slot" => format!("{}", status.head_slot), ); // Process the status message, without sending back another status. self.process_status(peer_id, status); } /// Process a `Status` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) { let remote = PeerSyncInfo::from(status); let local = match PeerSyncInfo::from_chain(&self.chain) { Some(local) => local, None => { return error!( self.log, "Failed to get peer sync info"; "msg" => "likely due to head lock contention" ) } }; let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); if local.fork_digest != remote.fork_digest { // The node is on a different network/fork, disconnect them. debug!( self.log, "Handshake Failure"; "peer_id" => peer_id.to_string(), "reason" => "incompatible forks", "our_fork" => hex::encode(local.fork_digest), "their_fork" => hex::encode(remote.fork_digest) ); self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.head_slot > self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE { // Note: If the slot_clock cannot be read, this will not error. Other system // components will deal with an invalid slot clock error. // The remotes head is on a slot that is significantly ahead of ours. This could be // because they are using a different genesis time, or that theirs or our system // clock is incorrect. debug!( self.log, "Handshake Failure"; "peer" => peer_id.to_string(), "reason" => "different system clocks or genesis time" ); self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch <= local.finalized_epoch && remote.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero() && self .chain .root_at_slot(start_slot(remote.finalized_epoch)) .map(|root_opt| root_opt != Some(remote.finalized_root)) .unwrap_or_else(|_| false) { // The remotes finalized epoch is less than or greater than ours, but the block root is // different to the one in our chain. // // Therefore, the node is on a different chain and we should not communicate with them. debug!( self.log, "Handshake Failure"; "peer" => peer_id.to_string(), "reason" => "different finalized chain" ); self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch < local.finalized_epoch { // The node has a lower finalized epoch, their chain is not useful to us. There are two // cases where a node can have a lower finalized epoch: // // ## The node is on the same chain // // If a node is on the same chain but has a lower finalized epoch, their head must be // lower than ours. Therefore, we have nothing to request from them. // // ## The node is on a fork // // If a node is on a fork that has a lower finalized epoch, switching to that fork would // cause us to revert a finalized block. This is not permitted, therefore we have no // interest in their blocks. debug!( self.log, "NaivePeer"; "peer" => peer_id.to_string(), "reason" => "lower finalized epoch" ); } else if self .chain .store .item_exists::>(&remote.head_root) .unwrap_or_else(|_| false) { debug!( self.log, "Peer with known chain found"; "peer" => peer_id.to_string(), "remote_head_slot" => remote.head_slot, "remote_latest_finalized_epoch" => remote.finalized_epoch, ); // If the node's best-block is already known to us and they are close to our current // head, treat them as a fully sync'd peer. self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // // Therefore, there are some blocks between the local finalized epoch and the remote // head that are worth downloading. debug!( self.log, "UsefulPeer"; "peer" => peer_id.to_string(), "local_finalized_epoch" => local.finalized_epoch, "remote_latest_finalized_epoch" => remote.finalized_epoch, ); self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } } /// Handle a `BlocksByRoot` request from the peer. pub fn on_blocks_by_root_request( &mut self, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { let mut send_block_count = 0; for root in request.block_roots.iter() { if let Ok(Some(block)) = self.chain.store.get_block(root) { self.network.send_response( peer_id.clone(), Response::BlocksByRoot(Some(Box::new(block))), request_id, ); send_block_count += 1; } else { debug!( self.log, "Peer requested unknown block"; "peer" => peer_id.to_string(), "request_root" => format!("{:}", root), ); } } debug!( self.log, "Received BlocksByRoot Request"; "peer" => peer_id.to_string(), "requested" => request.block_roots.len(), "returned" => send_block_count, ); // send stream termination self.network .send_response(peer_id, Response::BlocksByRoot(None), request_id); } /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request( &mut self, peer_id: PeerId, request_id: PeerRequestId, mut req: BlocksByRangeRequest, ) { debug!( self.log, "Received BlocksByRange Request"; "peer" => format!("{:?}", peer_id), "count" => req.count, "start_slot" => req.start_slot, "step" => req.step, ); // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOCKS { req.count = MAX_REQUEST_BLOCKS; } if req.step == 0 { warn!(self.log, "Peer sent invalid range request"; "error" => "Step sent was 0"); self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } let forwards_block_root_iter = match self .chain .forwards_iter_block_roots(Slot::from(req.start_slot)) { Ok(iter) => iter, Err(e) => { return error!( self.log, "Unable to obtain root iter"; "error" => format!("{:?}", e) ) } }; // pick out the required blocks, ignoring skip-slots and stepping by the step parameter; let mut last_block_root = None; let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step) // map skip slots to None .map(|(root, _)| { let result = if Some(root) == last_block_root { None } else { Some(root) }; last_block_root = Some(root); result }) .step_by(req.step as usize) .collect::>>() }); let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); return; } }; // remove all skip slots let block_roots = block_roots .into_iter() .filter_map(|root| root) .collect::>(); let mut blocks_sent = 0; for root in block_roots { if let Ok(Some(block)) = self.chain.store.get_block(&root) { // Due to skip slots, blocks could be out of the range, we ensure they are in the // range before sending if block.slot() >= req.start_slot && block.slot() < req.start_slot + req.count * req.step { blocks_sent += 1; self.network.send_response( peer_id.clone(), Response::BlocksByRange(Some(Box::new(block))), request_id, ); } } else { error!( self.log, "Block in the chain is not in the store"; "request_root" => format!("{:}", root), ); } } if blocks_sent < (req.count as usize) { debug!( self.log, "BlocksByRange Response Sent"; "peer" => peer_id.to_string(), "msg" => "Failed to return all requested blocks", "start_slot" => req.start_slot, "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), "requested" => req.count, "returned" => blocks_sent); } else { debug!( self.log, "Sending BlocksByRange Response"; "peer" => peer_id.to_string(), "start_slot" => req.start_slot, "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), "requested" => req.count, "returned" => blocks_sent); } // send the stream terminator self.network .send_response(peer_id, Response::BlocksByRange(None), request_id); } /// Handle a `BlocksByRange` response from the peer. /// A `beacon_block` behaves as a stream which is terminated on a `None` response. pub fn on_blocks_by_range_response( &mut self, peer_id: PeerId, request_id: RequestId, beacon_block: Option>>, ) { trace!( self.log, "Received BlocksByRange Response"; "peer" => peer_id.to_string(), ); if let RequestId::Sync(id) = request_id { self.send_to_sync(SyncMessage::BlocksByRangeResponse { peer_id, request_id: id, beacon_block, }); } else { debug!( self.log, "All blocks by range responses should belong to sync" ); } } /// Handle a `BlocksByRoot` response from the peer. pub fn on_blocks_by_root_response( &mut self, peer_id: PeerId, request_id: RequestId, beacon_block: Option>>, ) { trace!( self.log, "Received BlocksByRoot Response"; "peer" => peer_id.to_string(), ); if let RequestId::Sync(id) = request_id { self.send_to_sync(SyncMessage::BlocksByRootResponse { peer_id, request_id: id, beacon_block, }); } else { debug!( self.log, "All Blocks by Root responses should belong to sync" ) } } /// Template function to be called on a block to determine if the block should be propagated /// across the network. pub fn should_forward_block( &mut self, block: Box>, ) -> Result, BlockError> { self.chain.verify_block_for_gossip(*block) } pub fn on_unknown_parent( &mut self, peer_id: PeerId, block: Box>, ) { self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block)); } /// Process a gossip message declaring a new block. /// /// Attempts to apply to block to the beacon chain. May queue the block for later processing. /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip( &mut self, peer_id: PeerId, verified_block: GossipVerifiedBlock, ) -> bool { let block = Box::new(verified_block.block.clone()); match self.chain.process_block(verified_block) { Ok(_block_root) => { trace!( self.log, "Gossipsub block processed"; "peer_id" => peer_id.to_string() ); // TODO: It would be better if we can run this _after_ we publish the block to // reduce block propagation latency. // // The `MessageHandler` would be the place to put this, however it doesn't seem // to have a reference to the `BeaconChain`. I will leave this for future // works. match self.chain.fork_choice() { Ok(()) => trace!( self.log, "Fork choice success"; "location" => "block gossip" ), Err(e) => error!( self.log, "Fork choice failed"; "error" => format!("{:?}", e), "location" => "block gossip" ), } } Err(BlockError::ParentUnknown { .. }) => { // Inform the sync manager to find parents for this block // This should not occur. It should be checked by `should_forward_block` error!( self.log, "Block with unknown parent attempted to be processed"; "peer_id" => peer_id.to_string() ); self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block)); } other => { warn!( self.log, "Invalid gossip beacon block"; "outcome" => format!("{:?}", other), "block root" => format!("{}", block.canonical_root()), "block slot" => block.slot() ); trace!( self.log, "Invalid gossip beacon block ssz"; "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), ); } } // TODO: Update with correct block gossip checking true } pub fn on_unaggregated_attestation_gossip( &mut self, message_id: MessageId, peer_id: PeerId, unaggregated_attestation: Attestation, subnet_id: SubnetId, should_process: bool, ) { self.gossip_processor_send .try_send(GossipWorkEvent::unaggregated_attestation( message_id, peer_id, unaggregated_attestation, subnet_id, should_process, )) .unwrap_or_else(|e| { error!( &self.log, "Unable to send to gossip processor"; "type" => "unaggregated attestation gossip", "error" => e.to_string(), ) }) } pub fn on_aggregated_attestation_gossip( &mut self, message_id: MessageId, peer_id: PeerId, aggregate: SignedAggregateAndProof, ) { self.gossip_processor_send .try_send(GossipWorkEvent::aggregated_attestation( message_id, peer_id, aggregate, )) .unwrap_or_else(|e| { error!( &self.log, "Unable to send to gossip processor"; "type" => "aggregated attestation gossip", "error" => e.to_string(), ) }) } /// Verify a voluntary exit before gossiping or processing it. /// /// Errors are logged at debug level. pub fn verify_voluntary_exit_for_gossip( &self, peer_id: &PeerId, voluntary_exit: SignedVoluntaryExit, ) -> Option> { let validator_index = voluntary_exit.message.validator_index; match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) { Ok(ObservationOutcome::New(sig_verified_exit)) => Some(sig_verified_exit), Ok(ObservationOutcome::AlreadyKnown) => { debug!( self.log, "Dropping exit for already exiting validator"; "validator_index" => validator_index, "peer" => peer_id.to_string() ); None } Err(e) => { debug!( self.log, "Dropping invalid exit"; "validator_index" => validator_index, "peer" => peer_id.to_string(), "error" => format!("{:?}", e) ); None } } } /// Import a verified exit into the op pool. pub fn import_verified_voluntary_exit( &self, verified_voluntary_exit: SigVerifiedOp, ) { self.chain.import_voluntary_exit(verified_voluntary_exit); debug!(self.log, "Successfully imported voluntary exit"); } /// Verify a proposer slashing before gossiping or processing it. /// /// Errors are logged at debug level. pub fn verify_proposer_slashing_for_gossip( &self, peer_id: &PeerId, proposer_slashing: ProposerSlashing, ) -> Option> { let validator_index = proposer_slashing.signed_header_1.message.proposer_index; match self .chain .verify_proposer_slashing_for_gossip(proposer_slashing) { Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing), Ok(ObservationOutcome::AlreadyKnown) => { debug!( self.log, "Dropping proposer slashing"; "reason" => "Already seen a proposer slashing for that validator", "validator_index" => validator_index, "peer" => peer_id.to_string() ); None } Err(e) => { debug!( self.log, "Dropping invalid proposer slashing"; "validator_index" => validator_index, "peer" => peer_id.to_string(), "error" => format!("{:?}", e) ); None } } } /// Import a verified proposer slashing into the op pool. pub fn import_verified_proposer_slashing( &self, proposer_slashing: SigVerifiedOp, ) { self.chain.import_proposer_slashing(proposer_slashing); debug!(self.log, "Successfully imported proposer slashing"); } /// Verify an attester slashing before gossiping or processing it. /// /// Errors are logged at debug level. pub fn verify_attester_slashing_for_gossip( &self, peer_id: &PeerId, attester_slashing: AttesterSlashing, ) -> Option>> { match self .chain .verify_attester_slashing_for_gossip(attester_slashing) { Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing), Ok(ObservationOutcome::AlreadyKnown) => { debug!( self.log, "Dropping attester slashing"; "reason" => "Slashings already known for all slashed validators", "peer" => peer_id.to_string() ); None } Err(e) => { debug!( self.log, "Dropping invalid attester slashing"; "peer" => peer_id.to_string(), "error" => format!("{:?}", e) ); None } } } /// Import a verified attester slashing into the op pool. pub fn import_verified_attester_slashing( &self, attester_slashing: SigVerifiedOp>, ) { if let Err(e) = self.chain.import_attester_slashing(attester_slashing) { debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); } else { debug!(self.log, "Successfully imported attester slashing"); } } } /// Build a `StatusMessage` representing the state of the given `beacon_chain`. pub(crate) fn status_message( beacon_chain: &BeaconChain, ) -> Option { let head_info = beacon_chain.head_info().ok()?; let genesis_validators_root = beacon_chain.genesis_validators_root; let fork_digest = ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root); Some(StatusMessage { fork_digest, finalized_root: head_info.finalized_checkpoint.root, finalized_epoch: head_info.finalized_checkpoint.epoch, head_root: head_info.block_root, head_slot: head_info.slot, }) } /// Wraps a Network Channel to employ various RPC related network functionality for the /// processor. pub struct HandlerNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>, /// Logger for the `NetworkContext`. log: slog::Logger, } impl HandlerNetworkContext { pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { Self { network_send, log } } /// Sends a message to the network task. fn inform_network(&mut self, msg: NetworkMessage) { self.network_send .send(msg) .unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service")) } /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason }); } /// Reports a peer's action, adjusting the peer's score. pub fn _report_peer(&mut self, peer_id: PeerId, action: PeerAction) { self.inform_network(NetworkMessage::ReportPeer { peer_id, action }); } /// Sends a request to the network task. pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { self.inform_network(NetworkMessage::SendRequest { peer_id, request_id: RequestId::Router, request, }) } /// Sends a response to the network task. pub fn send_response(&mut self, peer_id: PeerId, response: Response, id: PeerRequestId) { self.inform_network(NetworkMessage::SendResponse { peer_id, id, response, }) } /// Sends an error response to the network task. pub fn _send_error_response( &mut self, peer_id: PeerId, id: PeerRequestId, error: RPCResponseErrorCode, reason: String, ) { self.inform_network(NetworkMessage::SendError { peer_id, error, id, reason, }) } }