//! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { pub sender_id: RangeRequestId, pub responses: Result>, String>, pub request_type: ByRangeRequestType, } #[derive(Debug, Clone, Copy)] pub enum RangeRequestId { RangeSync { chain_id: ChainId, batch_id: BatchId, }, BackfillSync { batch_id: BatchId, }, } /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>, /// A sequential ID for all RPC requests. request_id: Id, /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, /// Sends work to the beacon processor via a channel. network_beacon_processor: Arc>, pub chain: Arc>, /// Logger for the `SyncNetworkContext`. pub log: slog::Logger, } /// Small enumeration to make dealing with block and blob requests easier. pub enum BlockOrBlob { Block(Option>>), Blob(Option>>), } impl From>>> for BlockOrBlob { fn from(block: Option>>) -> Self { BlockOrBlob::Block(block) } } impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { BlockOrBlob::Blob(blob) } } impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, network_beacon_processor: Arc>, chain: Arc>, log: slog::Logger, ) -> Self { SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, log, } } pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { self.network_globals() .peers .read() .peer_info(peer_id) .map(|info| info.client().clone()) .unwrap_or_default() } pub fn status_peers(&self, chain: &C, peers: impl Iterator) { let status_message = chain.status_message(); for peer_id in peers { debug!( self.log, "Sending Status Request"; "peer" => %peer_id, "fork_digest" => ?status_message.fork_digest, "finalized_root" => ?status_message.finalized_root, "finalized_epoch" => ?status_message.finalized_epoch, "head_root" => %status_message.head_root, "head_slot" => %status_message.head_slot, ); let request = Request::Status(status_message.clone()); let request_id = RequestId::Router; let _ = self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, request_id, }); } } /// A blocks by range request for the range sync algorithm. pub fn blocks_by_range_request( &mut self, peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, ) -> Result { let id = self.next_id(); trace!( self.log, "Sending BlocksByRange request"; "method" => "BlocksByRange", "count" => request.count(), "peer" => %peer_id, ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRange(request.clone()), request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), })?; if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { debug!( self.log, "Sending BlobsByRange requests"; "method" => "BlobsByRange", "count" => request.count(), "peer" => %peer_id, ); // Create the blob request based on the blocks request. self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::BlobsByRange(BlobsByRangeRequest { start_slot: *request.start_slot(), count: *request.count(), }), request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), })?; } Ok(id) } /// A blocks by range request sent by the range sync algorithm pub fn blocks_and_blobs_by_range_request( &mut self, peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, sender_id: RangeRequestId, ) -> Result { let id = self.blocks_by_range_request(peer_id, batch_type, request)?; self.range_blocks_and_blobs_requests .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); Ok(id) } pub fn range_request_failed(&mut self, request_id: Id) -> Option { let sender_id = self .range_blocks_and_blobs_requests .remove(&request_id) .map(|(sender_id, _info)| sender_id); if let Some(sender_id) = sender_id { debug!( self.log, "Sync range request failed"; "request_id" => request_id, "sender_id" => ?sender_id ); Some(sender_id) } else { debug!(self.log, "Sync range request failed"; "request_id" => request_id); None } } /// Received a blocks by range or blobs by range response for a request that couples blocks ' /// and blobs. pub fn range_block_and_blob_response( &mut self, request_id: Id, block_or_blob: BlockOrBlob, ) -> Option> { match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything let (sender_id, info) = entry.remove(); let request_type = info.get_request_type(); Some(BlocksAndBlobsByRangeResponse { sender_id, request_type, responses: info.into_responses(), }) } else { None } } Entry::Vacant(_) => None, } } pub fn block_lookup_request( &self, id: SingleLookupReqId, peer_id: PeerId, request: BlocksByRootRequest, ) -> Result<(), &'static str> { debug!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", "block_roots" => ?request.block_roots().to_vec(), "peer" => %peer_id, "id" => ?id ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRoot(request), request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), })?; Ok(()) } pub fn blob_lookup_request( &self, id: SingleLookupReqId, blob_peer_id: PeerId, blob_request: BlobsByRootRequest, ) -> Result<(), &'static str> { if let Some(block_root) = blob_request .blob_ids .as_slice() .first() .map(|id| id.block_root) { let indices = blob_request .blob_ids .as_slice() .iter() .map(|id| id.index) .collect::>(); debug!( self.log, "Sending BlobsByRoot Request"; "method" => "BlobsByRoot", "block_root" => ?block_root, "blob_indices" => ?indices, "peer" => %blob_peer_id, "id" => ?id ); self.send_network_msg(NetworkMessage::SendRequest { peer_id: blob_peer_id, request: Request::BlobsByRoot(blob_request), request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), })?; } Ok(()) } pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } pub fn update_execution_engine_state(&mut self, engine_state: EngineState) { debug!(self.log, "Sync's view on execution engine state updated"; "past_state" => ?self.execution_engine_state, "new_state" => ?engine_state); self.execution_engine_state = engine_state; } /// Terminates the connection with the peer and bans them. pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.network_send .send(NetworkMessage::GoodbyePeer { peer_id, reason, source: ReportSource::SyncService, }) .unwrap_or_else(|_| { warn!(self.log, "Could not report peer: channel failed"); }); } /// Reports to the scoring algorithm the behaviour of a peer. pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) { debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); self.network_send .send(NetworkMessage::ReportPeer { peer_id, action, source: ReportSource::SyncService, msg, }) .unwrap_or_else(|e| { warn!(self.log, "Could not report peer: channel failed"; "error"=> %e); }); } /// Subscribes to core topics. pub fn subscribe_core_topics(&self) { self.network_send .send(NetworkMessage::SubscribeCoreTopics) .unwrap_or_else(|e| { warn!(self.log, "Could not subscribe to core topics."; "error" => %e); }); } /// Sends an arbitrary network message. fn send_network_msg(&self, msg: NetworkMessage) -> Result<(), &'static str> { self.network_send.send(msg).map_err(|_| { debug!(self.log, "Could not send message to the network service"); "Network channel send Failed" }) } pub fn beacon_processor_if_enabled(&self) -> Option<&Arc>> { self.is_execution_engine_online() .then_some(&self.network_beacon_processor) } pub fn beacon_processor(&self) -> &Arc> { &self.network_beacon_processor } pub fn next_id(&mut self) -> Id { let id = self.request_id; self.request_id += 1; id } /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { // Induces a compile time panic if this doesn't hold true. #[allow(clippy::assertions_on_constants)] const _: () = assert!( super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 && super::range_sync::EPOCHS_PER_BATCH == 1, "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks } } else { ByRangeRequestType::Blocks } } pub fn insert_range_blocks_and_blobs_request( &mut self, id: Id, sender_id: RangeRequestId, info: BlocksAndBlobsRequestInfo, ) { self.range_blocks_and_blobs_requests .insert(id, (sender_id, info)); } }