From a59a61fef99c1c91edc4e660f633df6dee6bf862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 9 Jul 2024 00:56:14 +0100 Subject: [PATCH] Remove generic Id param from RequestId (#6032) * rename RequestId's for better context, and move them to lighthouse_network crate. * remove unrequired generic AppReqId from RequestID --- .../src/rpc/self_limiter.rs | 28 +++++++--- .../src/service/api_types.rs | 36 +++++++++++-- .../src/service/behaviour.rs | 7 ++- .../lighthouse_network/src/service/mod.rs | 46 +++++++--------- .../lighthouse_network/tests/common.rs | 7 ++- .../lighthouse_network/tests/rpc_tests.rs | 31 +++++------ beacon_node/network/src/router.rs | 54 +++++++++---------- beacon_node/network/src/service.rs | 15 ++---- .../network/src/sync/backfill_sync/mod.rs | 3 +- .../network/src/sync/block_lookups/common.rs | 2 +- .../network/src/sync/block_lookups/mod.rs | 5 +- .../sync/block_lookups/single_block_lookup.rs | 2 +- .../network/src/sync/block_lookups/tests.rs | 16 +++--- beacon_node/network/src/sync/manager.rs | 52 ++++++------------ .../network/src/sync/network_context.rs | 16 +++--- .../network/src/sync/range_sync/batch.rs | 2 +- .../network/src/sync/range_sync/chain.rs | 5 +- .../network/src/sync/range_sync/range.rs | 18 ++++--- 18 files changed, 175 insertions(+), 170 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 115ae45a98..77caecb16d 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -212,7 +212,7 @@ mod tests { use crate::rpc::rate_limiter::Quota; use crate::rpc::self_limiter::SelfRateLimiter; use crate::rpc::{OutboundRequest, Ping, Protocol}; - use crate::service::api_types::RequestId; + use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId}; use libp2p::PeerId; use std::time::Duration; use types::MainnetEthSpec; @@ -225,15 +225,17 @@ mod tests { ping_quota: Quota::n_every(1, 2), ..Default::default() }); - let mut limiter: SelfRateLimiter, MainnetEthSpec> = + let mut limiter: SelfRateLimiter = SelfRateLimiter::new(config, log).unwrap(); let peer_id = PeerId::random(); - for i in 1..=5 { + for i in 1..=5u32 { let _ = limiter.allows( peer_id, - RequestId::Application(i), - OutboundRequest::Ping(Ping { data: i }), + RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { + id: i, + })), + OutboundRequest::Ping(Ping { data: i as u64 }), ); } @@ -246,8 +248,13 @@ mod tests { // Check that requests in the queue are ordered in the sequence 2, 3, 4, 5. let mut iter = queue.iter(); - for i in 2..=5 { - assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i)); + for i in 2..=5u32 { + assert!(matches!( + iter.next().unwrap().request_id, + RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { + id, + })) if id == i + )); } assert_eq!(limiter.ready_requests.len(), 0); @@ -267,7 +274,12 @@ mod tests { // Check that requests in the queue are ordered in the sequence 3, 4, 5. let mut iter = queue.iter(); for i in 3..=5 { - assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i)); + assert!(matches!( + iter.next().unwrap().request_id, + RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { + id + })) if id == i + )); } assert_eq!(limiter.ready_requests.len(), 1); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 2ea4150248..376ac34dee 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -19,10 +19,36 @@ use crate::rpc::{ /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); -/// Identifier of a request. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum RequestId { - Application(AppReqId), +pub type Id = u32; + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct SingleLookupReqId { + pub lookup_id: Id, + pub req_id: Id, +} + +/// Id of rpc requests sent by sync to the network. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum SyncRequestId { + /// Request searching for a block given a hash. + SingleBlock { id: SingleLookupReqId }, + /// Request searching for a set of blobs given a hash. + SingleBlob { id: SingleLookupReqId }, + /// Range request that is composed by both a block range request and a blob range request. + RangeBlockAndBlobs { id: Id }, +} + +/// Application level requests sent to the network. +#[derive(Debug, Clone, Copy)] +pub enum AppRequestId { + Sync(SyncRequestId), + Router, +} + +/// Global identifier of a request. +#[derive(Debug, Clone, Copy)] +pub enum RequestId { + Application(AppRequestId), Internal, } @@ -142,7 +168,7 @@ impl std::convert::From> for RPCCodedResponse { } } -impl slog::Value for RequestId { +impl slog::Value for RequestId { fn serialize( &self, record: &slog::Record, diff --git a/beacon_node/lighthouse_network/src/service/behaviour.rs b/beacon_node/lighthouse_network/src/service/behaviour.rs index 90121ffbfb..ab2e43630b 100644 --- a/beacon_node/lighthouse_network/src/service/behaviour.rs +++ b/beacon_node/lighthouse_network/src/service/behaviour.rs @@ -1,6 +1,6 @@ use crate::discovery::Discovery; use crate::peer_manager::PeerManager; -use crate::rpc::{ReqId, RPC}; +use crate::rpc::RPC; use crate::types::SnappyTransform; use libp2p::identify; @@ -16,9 +16,8 @@ pub type SubscriptionFilter = pub type Gossipsub = gossipsub::Behaviour; #[derive(NetworkBehaviour)] -pub(crate) struct Behaviour +pub(crate) struct Behaviour where - AppReqId: ReqId, E: EthSpec, { /// Keep track of active and pending connections to enforce hard limits. @@ -26,7 +25,7 @@ where /// The peer manager that keeps track of peer's reputation and status. pub peer_manager: PeerManager, /// The Eth2 RPC specified in the wire-0 protocol. - pub eth2_rpc: RPC, E>, + pub eth2_rpc: RPC, /// Discv5 Discovery protocol. pub discovery: Discovery, /// Keep regular connection to peers and disconnect if absent. diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index dbf7c38226..2868c616bd 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -21,7 +21,7 @@ use crate::types::{ use crate::EnrExt; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash}; -use api_types::{PeerRequestId, Request, RequestId, Response}; +use api_types::{AppRequestId, PeerRequestId, Request, RequestId, Response}; use futures::stream::StreamExt; use gossipsub::{ IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError, @@ -57,7 +57,7 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10; /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] -pub enum NetworkEvent { +pub enum NetworkEvent { /// We have successfully dialed and connected to a peer. PeerConnectedOutgoing(PeerId), /// A peer has successfully dialed and connected to us. @@ -67,7 +67,7 @@ pub enum NetworkEvent { /// An RPC Request that was sent failed. RPCFailed { /// The id of the failed request. - id: AppReqId, + id: AppRequestId, /// The peer to which this request was sent. peer_id: PeerId, /// The error of the failed request. @@ -85,7 +85,7 @@ pub enum NetworkEvent { /// Peer that sent the response. peer_id: PeerId, /// Id of the request to which the peer is responding. - id: AppReqId, + id: AppRequestId, /// Response the peer sent. response: Response, }, @@ -108,8 +108,8 @@ pub enum NetworkEvent { /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. -pub struct Network { - swarm: libp2p::swarm::Swarm>, +pub struct Network { + swarm: libp2p::swarm::Swarm>, /* Auxiliary Fields */ /// A collections of variables accessible outside the network service. network_globals: Arc>, @@ -132,7 +132,7 @@ pub struct Network { } /// Implements the combined behaviour for the libp2p service. -impl Network { +impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, @@ -592,7 +592,7 @@ impl Network { &mut self.swarm.behaviour_mut().gossipsub } /// The Eth2 RPC specified in the wire-0 protocol. - pub fn eth2_rpc_mut(&mut self) -> &mut RPC, E> { + pub fn eth2_rpc_mut(&mut self) -> &mut RPC { &mut self.swarm.behaviour_mut().eth2_rpc } /// Discv5 Discovery protocol. @@ -613,7 +613,7 @@ impl Network { &self.swarm.behaviour().gossipsub } /// The Eth2 RPC specified in the wire-0 protocol. - pub fn eth2_rpc(&self) -> &RPC, E> { + pub fn eth2_rpc(&self) -> &RPC { &self.swarm.behaviour().eth2_rpc } /// Discv5 Discovery protocol. @@ -920,9 +920,9 @@ impl Network { pub fn send_request( &mut self, peer_id: PeerId, - request_id: AppReqId, + request_id: AppRequestId, request: Request, - ) -> Result<(), (AppReqId, RPCError)> { + ) -> Result<(), (AppRequestId, RPCError)> { // Check if the peer is connected before sending an RPC request if !self.swarm.is_connected(&peer_id) { return Err((request_id, RPCError::Disconnected)); @@ -1157,10 +1157,10 @@ impl Network { #[must_use = "return the response"] fn build_response( &mut self, - id: RequestId, + id: RequestId, peer_id: PeerId, response: Response, - ) -> Option> { + ) -> Option> { match id { RequestId::Application(id) => Some(NetworkEvent::ResponseReceived { peer_id, @@ -1178,7 +1178,7 @@ impl Network { id: PeerRequestId, peer_id: PeerId, request: Request, - ) -> NetworkEvent { + ) -> NetworkEvent { // Increment metrics match &request { Request::Status(_) => { @@ -1244,7 +1244,7 @@ impl Network { /* Sub-behaviour event handling functions */ /// Handle a gossipsub event. - fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option> { + fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option> { match event { gossipsub::Event::Message { propagation_source, @@ -1383,10 +1383,7 @@ impl Network { } /// Handle an RPC event. - fn inject_rpc_event( - &mut self, - event: RPCMessage, E>, - ) -> Option> { + fn inject_rpc_event(&mut self, event: RPCMessage) -> Option> { let peer_id = event.peer_id; // Do not permit Inbound events from peers that are being disconnected, or RPC requests. @@ -1619,10 +1616,7 @@ impl Network { } /// Handle an identify event. - fn inject_identify_event( - &mut self, - event: identify::Event, - ) -> Option> { + fn inject_identify_event(&mut self, event: identify::Event) -> Option> { match event { identify::Event::Received { peer_id, mut info } => { if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { @@ -1643,7 +1637,7 @@ impl Network { } /// Handle a peer manager event. - fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option> { + fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option> { match event { PeerManagerEvent::PeerConnectedIncoming(peer_id) => { Some(NetworkEvent::PeerConnectedIncoming(peer_id)) @@ -1747,7 +1741,7 @@ impl Network { /// Poll the p2p networking stack. /// /// This will poll the swarm and do maintenance routines. - pub fn poll_network(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_network(&mut self, cx: &mut Context) -> Poll> { while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) { let maybe_event = match swarm_event { SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { @@ -1889,7 +1883,7 @@ impl Network { Poll::Pending } - pub async fn next_event(&mut self) -> NetworkEvent { + pub async fn next_event(&mut self) -> NetworkEvent { futures::future::poll_fn(|cx| self.poll_network(cx)).await } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index 32e3a03466..25431226ca 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -13,7 +13,6 @@ use types::{ }; type E = MinimalEthSpec; -type ReqId = usize; use tempfile::Builder as TempBuilder; @@ -44,14 +43,14 @@ pub fn fork_context(fork_name: ForkName) -> ForkContext { } pub struct Libp2pInstance( - LibP2PService, + LibP2PService, #[allow(dead_code)] // This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute. async_channel::Sender<()>, ); impl std::ops::Deref for Libp2pInstance { - type Target = LibP2PService; + type Target = LibP2PService; fn deref(&self) -> &Self::Target { &self.0 } @@ -125,7 +124,7 @@ pub async fn build_libp2p_instance( } #[allow(dead_code)] -pub fn get_enr(node: &LibP2PService) -> Enr { +pub fn get_enr(node: &LibP2PService) -> Enr { node.local_enr() } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 527b853dc3..12a1c59393 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -4,6 +4,7 @@ mod common; use common::Protocol; use lighthouse_network::rpc::methods::*; +use lighthouse_network::service::api_types::AppRequestId; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -99,12 +100,12 @@ fn test_tcp_status_rpc() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, 10, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { peer_id: _, - id: 10, + id: AppRequestId::Router, response, } => { // Should receive the RPC response @@ -196,7 +197,6 @@ fn test_tcp_blocks_by_range_chunked_rpc() { // keep count of the number of messages received let mut messages_received = 0; - let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -205,7 +205,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, request_id, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { @@ -323,7 +323,6 @@ fn test_blobs_by_range_chunked_rpc() { // keep count of the number of messages received let mut messages_received = 0; - let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -332,7 +331,7 @@ fn test_blobs_by_range_chunked_rpc() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, request_id, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { @@ -433,7 +432,6 @@ fn test_tcp_blocks_by_range_over_limit() { let rpc_response_bellatrix_large = Response::BlocksByRange(Some(Arc::new(signed_full_block))); - let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -442,12 +440,12 @@ fn test_tcp_blocks_by_range_over_limit() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, request_id, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } // The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE NetworkEvent::RPCFailed { id, .. } => { - assert_eq!(id, request_id); + assert!(matches!(id, AppRequestId::Router)); return; } _ => {} // Ignore other behaviour events @@ -528,7 +526,6 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { // keep count of the number of messages received let mut messages_received: u64 = 0; - let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -537,7 +534,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, request_id, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { @@ -668,12 +665,12 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, 10, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { peer_id: _, - id: 10, + id: AppRequestId::Router, response, } => match response { Response::BlocksByRange(Some(_)) => { @@ -793,12 +790,12 @@ fn test_tcp_blocks_by_root_chunked_rpc() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, 6, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { peer_id: _, - id: 6, + id: AppRequestId::Router, response, } => match response { Response::BlocksByRoot(Some(_)) => { @@ -926,12 +923,12 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { // Send a STATUS message debug!(log, "Sending RPC"); sender - .send_request(peer_id, 10, rpc_request.clone()) + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); } NetworkEvent::ResponseReceived { peer_id: _, - id: 10, + id: AppRequestId::Router, response, } => { debug!(log, "Sender received a response"); diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 1937fc11cf..e125c13f4c 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -7,9 +7,8 @@ use crate::error; use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}; -use crate::service::{NetworkMessage, RequestId}; +use crate::service::NetworkMessage; use crate::status::status_message; -use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_processor::{ @@ -18,6 +17,7 @@ use beacon_processor::{ use futures::prelude::*; use lighthouse_network::rpc::*; use lighthouse_network::{ + service::api_types::{AppRequestId, SyncRequestId}, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use logging::TimeLatch; @@ -61,13 +61,13 @@ pub enum RouterMessage { /// An RPC response has been received. RPCResponseReceived { peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, response: Response, }, /// An RPC request failed RPCFailed { peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, error: RPCError, }, /// A gossip message has been received. The fields are: message id, the peer that sent us this @@ -235,7 +235,7 @@ impl Router { fn handle_rpc_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, response: Response, ) { match response { @@ -448,9 +448,9 @@ impl Router { /// An error occurred during an RPC request. The state is maintained by the sync manager, so /// this function notifies the sync manager of the error. - pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: AppRequestId, error: RPCError) { // Check if the failed RPC belongs to sync - if let RequestId::Sync(request_id) = request_id { + if let AppRequestId::Sync(request_id) = request_id { self.send_to_sync(SyncMessage::RpcError { peer_id, request_id, @@ -488,18 +488,18 @@ impl Router { pub fn on_blocks_by_range_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, beacon_block: Option>>, ) { let request_id = match request_id { - RequestId::Sync(sync_id) => match sync_id { - SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => { + AppRequestId::Sync(sync_id) => match sync_id { + SyncRequestId::SingleBlock { .. } | SyncRequestId::SingleBlob { .. } => { crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); return; } - id @ SyncId::RangeBlockAndBlobs { .. } => id, + id @ SyncRequestId::RangeBlockAndBlobs { .. } => id, }, - RequestId::Router => { + AppRequestId::Router => { crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id); return; } @@ -522,7 +522,7 @@ impl Router { pub fn on_blobs_by_range_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, blob_sidecar: Option>>, ) { trace!( @@ -531,7 +531,7 @@ impl Router { "peer" => %peer_id, ); - if let RequestId::Sync(id) = request_id { + if let AppRequestId::Sync(id) = request_id { self.send_to_sync(SyncMessage::RpcBlob { peer_id, request_id: id, @@ -550,22 +550,22 @@ impl Router { pub fn on_blocks_by_root_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, beacon_block: Option>>, ) { let request_id = match request_id { - RequestId::Sync(sync_id) => match sync_id { - id @ SyncId::SingleBlock { .. } => id, - SyncId::RangeBlockAndBlobs { .. } => { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SingleBlock { .. } => id, + SyncRequestId::RangeBlockAndBlobs { .. } => { crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id); return; } - SyncId::SingleBlob { .. } => { + SyncRequestId::SingleBlob { .. } => { crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id); return; } }, - RequestId::Router => { + AppRequestId::Router => { crit!(self.log, "All BBRoot requests belong to sync"; "peer_id" => %peer_id); return; } @@ -588,22 +588,22 @@ impl Router { pub fn on_blobs_by_root_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: AppRequestId, blob_sidecar: Option>>, ) { let request_id = match request_id { - RequestId::Sync(sync_id) => match sync_id { - id @ SyncId::SingleBlob { .. } => id, - SyncId::SingleBlock { .. } => { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SingleBlob { .. } => id, + SyncRequestId::SingleBlock { .. } => { crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); return; } - SyncId::RangeBlockAndBlobs { .. } => { + SyncRequestId::RangeBlockAndBlobs { .. } => { crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id); return; } }, - RequestId::Router => { + AppRequestId::Router => { crit!(self.log, "All BlobsByRoot requests belong to sync"; "peer_id" => %peer_id); return; } @@ -667,7 +667,7 @@ impl HandlerNetworkContext { pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { self.inform_network(NetworkMessage::SendRequest { peer_id, - request_id: RequestId::Router, + request_id: AppRequestId::Router, request, }) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e215f25387..e522285a9e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,4 +1,3 @@ -use super::sync::manager::RequestId as SyncId; use crate::nat; use crate::network_beacon_processor::InvalidBlockStorage; use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; @@ -23,6 +22,7 @@ use lighthouse_network::{ Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet, }; use lighthouse_network::{ + service::api_types::AppRequestId, types::{core_topics_to_subscribe, GossipEncoding, GossipTopic}, MessageId, NetworkEvent, NetworkGlobals, PeerId, }; @@ -51,13 +51,6 @@ const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2; /// able to run tens of thousands of validators on one BN. const VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE: usize = 65_536; -/// Application level requests sent to the network. -#[derive(Debug, Clone, Copy)] -pub enum RequestId { - Sync(SyncId), - Router, -} - /// Types of messages that the network service can receive. #[derive(Debug, IntoStaticStr)] #[strum(serialize_all = "snake_case")] @@ -69,7 +62,7 @@ pub enum NetworkMessage { SendRequest { peer_id: PeerId, request: Request, - request_id: RequestId, + request_id: AppRequestId, }, /// Send a successful Response to the libp2p service. SendResponse { @@ -168,7 +161,7 @@ pub struct NetworkService { /// A reference to the underlying beacon chain. beacon_chain: Arc>, /// The underlying libp2p service that drives all the network interactions. - libp2p: Network, + libp2p: Network, /// An attestation and subnet manager service. attestation_service: AttestationService, /// A sync committeee subnet manager service. @@ -499,7 +492,7 @@ impl NetworkService { /// Handle an event received from the network. async fn on_libp2p_event( &mut self, - ev: NetworkEvent, + ev: NetworkEvent, shutdown_sender: &mut Sender, ) { match ev { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index fe133e8e1c..356380546a 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -9,7 +9,7 @@ //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::manager::{BatchProcessResult, Id}; +use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::RangeRequestId; use crate::sync::network_context::SyncNetworkContext; use crate::sync::range_sync::{ @@ -17,6 +17,7 @@ use crate::sync::range_sync::{ }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use rand::seq::SliceRandom; diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index aef76fb0da..e94e9589c0 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,10 +2,10 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; -use crate::sync::manager::Id; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; +use lighthouse_network::service::api_types::Id; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; use types::SignedBeaconBlock; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 66b2d808d9..7093915ef2 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -28,12 +28,12 @@ use super::network_context::{RpcResponseResult, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::manager::{Id, SingleLookupReqId}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; +use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; @@ -107,6 +107,9 @@ pub struct BlockLookups { log: Logger, } +#[cfg(test)] +use lighthouse_network::service::api_types::Id; + #[cfg(test)] /// Tuple of `SingleLookupId`, requested block root, awaiting parent block root (if any), /// and list of peers that claim to have imported this set of block components. diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index e17991286a..0466636fb7 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,12 +1,12 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; -use crate::sync::block_lookups::Id; use crate::sync::network_context::{ LookupRequestResult, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, }; use beacon_chain::BeaconChainTypes; use derivative::Derivative; +use lighthouse_network::service::api_types::Id; use rand::seq::IteratorRandom; use std::collections::HashSet; use std::fmt::Debug; diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 470e10c55c..ef2822fe56 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,9 +1,6 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; -use crate::service::RequestId; -use crate::sync::manager::{ - BlockProcessType, RequestId as SyncRequestId, SingleLookupReqId, SyncManager, -}; +use crate::sync::manager::{BlockProcessType, SyncManager}; use crate::sync::SyncMessage; use crate::NetworkMessage; use std::sync::Arc; @@ -24,6 +21,7 @@ use beacon_chain::{ }; use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; +use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId}; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; use slog::info; @@ -550,7 +548,7 @@ impl TestRig { while let Ok(request_id) = self.pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, - request_id: RequestId::Sync(id), + request_id: AppRequestId::Sync(id), .. } if *peer_id == disconnected_peer_id => Some(*id), _ => None, @@ -631,7 +629,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id: _, request: Request::BlocksByRoot(request), - request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), } if request.block_roots().to_vec().contains(&for_block) => Some(*id), _ => None, }) @@ -651,7 +649,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id: _, request: Request::BlobsByRoot(request), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), } if request .blob_ids .to_vec() @@ -676,7 +674,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id: _, request: Request::BlocksByRoot(request), - request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), } if request.block_roots().to_vec().contains(&for_block) => Some(*id), _ => None, }) @@ -698,7 +696,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id: _, request: Request::BlobsByRoot(request), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), } if request .blob_ids .to_vec() diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9540709294..ee538e8e28 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -53,6 +53,7 @@ use beacon_chain::{ }; use futures::StreamExt; use lighthouse_network::rpc::RPCError; +use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId}; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; @@ -78,25 +79,6 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; /// arbitrary number that covers a full slot, but allows recovery if sync get stuck for a few slots. const NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS: u64 = 30; -pub type Id = u32; - -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct SingleLookupReqId { - pub lookup_id: Id, - pub req_id: Id, -} - -/// Id of rpc requests sent by sync to the network. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub enum RequestId { - /// Request searching for a block given a hash. - SingleBlock { id: SingleLookupReqId }, - /// Request searching for a set of blobs given a hash. - SingleBlob { id: SingleLookupReqId }, - /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { id: Id }, -} - #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -105,7 +87,7 @@ pub enum SyncMessage { /// A block has been received from the RPC. RpcBlock { - request_id: RequestId, + request_id: SyncRequestId, peer_id: PeerId, beacon_block: Option>>, seen_timestamp: Duration, @@ -113,7 +95,7 @@ pub enum SyncMessage { /// A blob has been received from the RPC. RpcBlob { - request_id: RequestId, + request_id: SyncRequestId, peer_id: PeerId, blob_sidecar: Option>>, seen_timestamp: Duration, @@ -135,7 +117,7 @@ pub enum SyncMessage { /// An RPC Error has occurred on a request. RpcError { peer_id: PeerId, - request_id: RequestId, + request_id: SyncRequestId, error: RPCError, }, @@ -342,16 +324,16 @@ impl SyncManager { } /// Handles RPC errors related to requests that were emitted from the sync manager. - fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); match request_id { - RequestId::SingleBlock { id } => { + SyncRequestId::SingleBlock { id } => { self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error)) } - RequestId::SingleBlob { id } => { + SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } - RequestId::RangeBlockAndBlobs { id } => { + SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { RangeRequestId::RangeSync { chain_id, batch_id } => { @@ -835,13 +817,13 @@ impl SyncManager { fn rpc_block_received( &mut self, - request_id: RequestId, + request_id: SyncRequestId, peer_id: PeerId, block: Option>>, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.on_single_block_response( + SyncRequestId::SingleBlock { id } => self.on_single_block_response( id, peer_id, match block { @@ -849,10 +831,10 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - RequestId::SingleBlob { .. } => { + SyncRequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } - RequestId::RangeBlockAndBlobs { id } => { + SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, block.into()) } } @@ -877,16 +859,16 @@ impl SyncManager { fn rpc_blob_received( &mut self, - request_id: RequestId, + request_id: SyncRequestId, peer_id: PeerId, blob: Option>>, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { .. } => { + SyncRequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } - RequestId::SingleBlob { id } => self.on_single_blob_response( + SyncRequestId::SingleBlob { id } => self.on_single_blob_response( id, peer_id, match blob { @@ -894,7 +876,7 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - RequestId::RangeBlockAndBlobs { id } => { + SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } } @@ -978,7 +960,7 @@ impl SyncManager { "sender_id" => ?resp.sender_id, "error" => e.clone() ); - let id = RequestId::RangeBlockAndBlobs { id }; + let id = SyncRequestId::RangeBlockAndBlobs { id }; self.network.report_peer( peer_id, PeerAction::MidToleranceError, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6f89b954b3..33d56ae87e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,18 +4,18 @@ use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; -use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; -use crate::service::{NetworkMessage, RequestId}; +use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; -use crate::sync::manager::{BlockProcessType, SingleLookupReqId}; +use crate::sync::manager::BlockProcessType; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; +use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; pub use requests::LookupVerifyError; use slog::{debug, error, trace, warn}; @@ -246,7 +246,7 @@ impl SyncNetworkContext { ); let request = Request::Status(status_message.clone()); - let request_id = RequestId::Router; + let request_id = AppRequestId::Router; let _ = self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, @@ -274,7 +274,7 @@ impl SyncNetworkContext { .send(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRange(request.clone()), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; @@ -295,7 +295,7 @@ impl SyncNetworkContext { start_slot: *request.start_slot(), count: *request.count(), }), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; } @@ -424,7 +424,7 @@ impl SyncNetworkContext { .send(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; @@ -510,7 +510,7 @@ impl SyncNetworkContext { .send(NetworkMessage::SendRequest { peer_id, request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index baba8c9a62..6e377cc6cb 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,6 +1,6 @@ -use crate::sync::manager::Id; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use lighthouse_network::rpc::methods::BlocksByRangeRequest; +use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ea873bdca0..9204d41a90 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,12 +1,11 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::network_context::RangeRequestId; -use crate::sync::{ - manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, -}; +use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; +use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 45d2918331..fa06af2495 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -44,12 +44,12 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::status::ToStatusMessage; -use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::GoodbyeReason; +use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; use lru_cache::LRUTimeCache; @@ -380,7 +380,6 @@ where #[cfg(test)] mod tests { use crate::network_beacon_processor::NetworkBeaconProcessor; - use crate::service::RequestId; use crate::NetworkMessage; use super::*; @@ -391,7 +390,10 @@ mod tests { use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::EngineState; use beacon_processor::WorkEvent as BeaconWorkEvent; - use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; + use lighthouse_network::service::api_types::SyncRequestId; + use lighthouse_network::{ + rpc::StatusMessage, service::api_types::AppRequestId, NetworkGlobals, + }; use slog::{o, Drain}; use slot_clock::TestingSlotClock; use std::collections::HashSet; @@ -517,7 +519,7 @@ mod tests { &mut self, expected_peer: &PeerId, fork_name: ForkName, - ) -> (RequestId, Option) { + ) -> (AppRequestId, Option) { let block_req_id = if let Ok(NetworkMessage::SendRequest { peer_id, request: _, @@ -550,12 +552,12 @@ mod tests { fn complete_range_block_and_blobs_response( &mut self, - block_req: RequestId, - blob_req_opt: Option, + block_req: AppRequestId, + blob_req_opt: Option, ) -> (ChainId, BatchId, Id) { if blob_req_opt.is_some() { match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { let _ = self .cx .range_block_and_blob_response(id, BlockOrBlob::Block(None)); @@ -571,7 +573,7 @@ mod tests { } } else { match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { let response = self .cx .range_block_and_blob_response(id, BlockOrBlob::Block(None))