Remove generic Id param from RequestId (#6032)

* rename RequestId's for better context,

and move them to lighthouse_network crate.

* remove unrequired generic AppReqId from RequestID
This commit is contained in:
João Oliveira
2024-07-09 00:56:14 +01:00
committed by GitHub
parent 48c55ae295
commit a59a61fef9
18 changed files with 175 additions and 170 deletions

View File

@@ -212,7 +212,7 @@ mod tests {
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{OutboundRequest, Ping, Protocol};
use crate::service::api_types::RequestId;
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::MainnetEthSpec;
@@ -225,15 +225,17 @@ mod tests {
ping_quota: Quota::n_every(1, 2),
..Default::default()
});
let mut limiter: SelfRateLimiter<RequestId<u64>, MainnetEthSpec> =
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
let peer_id = PeerId::random();
for i in 1..=5 {
for i in 1..=5u32 {
let _ = limiter.allows(
peer_id,
RequestId::Application(i),
OutboundRequest::Ping(Ping { data: i }),
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
})),
OutboundRequest::Ping(Ping { data: i as u64 }),
);
}
@@ -246,8 +248,13 @@ mod tests {
// Check that requests in the queue are ordered in the sequence 2, 3, 4, 5.
let mut iter = queue.iter();
for i in 2..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
for i in 2..=5u32 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id,
})) if id == i
));
}
assert_eq!(limiter.ready_requests.len(), 0);
@@ -267,7 +274,12 @@ mod tests {
// Check that requests in the queue are ordered in the sequence 3, 4, 5.
let mut iter = queue.iter();
for i in 3..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
));
}
assert_eq!(limiter.ready_requests.len(), 1);

View File

@@ -19,10 +19,36 @@ use crate::rpc::{
/// Identifier of requests sent by a peer.
pub type PeerRequestId = (ConnectionId, SubstreamId);
/// Identifier of a request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestId<AppReqId> {
Application(AppReqId),
pub type Id = u32;
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SingleLookupReqId {
pub lookup_id: Id,
pub req_id: Id,
}
/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum SyncRequestId {
/// Request searching for a block given a hash.
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]
pub enum AppRequestId {
Sync(SyncRequestId),
Router,
}
/// Global identifier of a request.
#[derive(Debug, Clone, Copy)]
pub enum RequestId {
Application(AppRequestId),
Internal,
}
@@ -142,7 +168,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RPCCodedResponse<E> {
}
}
impl<AppReqId: std::fmt::Debug> slog::Value for RequestId<AppReqId> {
impl slog::Value for RequestId {
fn serialize(
&self,
record: &slog::Record,

View File

@@ -1,6 +1,6 @@
use crate::discovery::Discovery;
use crate::peer_manager::PeerManager;
use crate::rpc::{ReqId, RPC};
use crate::rpc::RPC;
use crate::types::SnappyTransform;
use libp2p::identify;
@@ -16,9 +16,8 @@ pub type SubscriptionFilter =
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour<AppReqId, E>
pub(crate) struct Behaviour<E>
where
AppReqId: ReqId,
E: EthSpec,
{
/// Keep track of active and pending connections to enforce hard limits.
@@ -26,7 +25,7 @@ where
/// The peer manager that keeps track of peer's reputation and status.
pub peer_manager: PeerManager<E>,
/// The Eth2 RPC specified in the wire-0 protocol.
pub eth2_rpc: RPC<RequestId<AppReqId>, E>,
pub eth2_rpc: RPC<RequestId, E>,
/// Discv5 Discovery protocol.
pub discovery: Discovery<E>,
/// Keep regular connection to peers and disconnect if absent.

View File

@@ -21,7 +21,7 @@ use crate::types::{
use crate::EnrExt;
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use api_types::{PeerRequestId, Request, RequestId, Response};
use api_types::{AppRequestId, PeerRequestId, Request, RequestId, Response};
use futures::stream::StreamExt;
use gossipsub::{
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
@@ -57,7 +57,7 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
/// The types of events than can be obtained from polling the behaviour.
#[derive(Debug)]
pub enum NetworkEvent<AppReqId: ReqId, E: EthSpec> {
pub enum NetworkEvent<E: EthSpec> {
/// We have successfully dialed and connected to a peer.
PeerConnectedOutgoing(PeerId),
/// A peer has successfully dialed and connected to us.
@@ -67,7 +67,7 @@ pub enum NetworkEvent<AppReqId: ReqId, E: EthSpec> {
/// An RPC Request that was sent failed.
RPCFailed {
/// The id of the failed request.
id: AppReqId,
id: AppRequestId,
/// The peer to which this request was sent.
peer_id: PeerId,
/// The error of the failed request.
@@ -85,7 +85,7 @@ pub enum NetworkEvent<AppReqId: ReqId, E: EthSpec> {
/// Peer that sent the response.
peer_id: PeerId,
/// Id of the request to which the peer is responding.
id: AppReqId,
id: AppRequestId,
/// Response the peer sent.
response: Response<E>,
},
@@ -108,8 +108,8 @@ pub enum NetworkEvent<AppReqId: ReqId, E: EthSpec> {
/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
pub struct Network<AppReqId: ReqId, E: EthSpec> {
swarm: libp2p::swarm::Swarm<Behaviour<AppReqId, E>>,
pub struct Network<E: EthSpec> {
swarm: libp2p::swarm::Swarm<Behaviour<E>>,
/* Auxiliary Fields */
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<E>>,
@@ -132,7 +132,7 @@ pub struct Network<AppReqId: ReqId, E: EthSpec> {
}
/// Implements the combined behaviour for the libp2p service.
impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
impl<E: EthSpec> Network<E> {
pub async fn new(
executor: task_executor::TaskExecutor,
mut ctx: ServiceContext<'_>,
@@ -592,7 +592,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
&mut self.swarm.behaviour_mut().gossipsub
}
/// The Eth2 RPC specified in the wire-0 protocol.
pub fn eth2_rpc_mut(&mut self) -> &mut RPC<RequestId<AppReqId>, E> {
pub fn eth2_rpc_mut(&mut self) -> &mut RPC<RequestId, E> {
&mut self.swarm.behaviour_mut().eth2_rpc
}
/// Discv5 Discovery protocol.
@@ -613,7 +613,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
&self.swarm.behaviour().gossipsub
}
/// The Eth2 RPC specified in the wire-0 protocol.
pub fn eth2_rpc(&self) -> &RPC<RequestId<AppReqId>, E> {
pub fn eth2_rpc(&self) -> &RPC<RequestId, E> {
&self.swarm.behaviour().eth2_rpc
}
/// Discv5 Discovery protocol.
@@ -920,9 +920,9 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
pub fn send_request(
&mut self,
peer_id: PeerId,
request_id: AppReqId,
request_id: AppRequestId,
request: Request,
) -> Result<(), (AppReqId, RPCError)> {
) -> Result<(), (AppRequestId, RPCError)> {
// Check if the peer is connected before sending an RPC request
if !self.swarm.is_connected(&peer_id) {
return Err((request_id, RPCError::Disconnected));
@@ -1157,10 +1157,10 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
#[must_use = "return the response"]
fn build_response(
&mut self,
id: RequestId<AppReqId>,
id: RequestId,
peer_id: PeerId,
response: Response<E>,
) -> Option<NetworkEvent<AppReqId, E>> {
) -> Option<NetworkEvent<E>> {
match id {
RequestId::Application(id) => Some(NetworkEvent::ResponseReceived {
peer_id,
@@ -1178,7 +1178,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
id: PeerRequestId,
peer_id: PeerId,
request: Request,
) -> NetworkEvent<AppReqId, E> {
) -> NetworkEvent<E> {
// Increment metrics
match &request {
Request::Status(_) => {
@@ -1244,7 +1244,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
/* Sub-behaviour event handling functions */
/// Handle a gossipsub event.
fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option<NetworkEvent<AppReqId, E>> {
fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option<NetworkEvent<E>> {
match event {
gossipsub::Event::Message {
propagation_source,
@@ -1383,10 +1383,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
}
/// Handle an RPC event.
fn inject_rpc_event(
&mut self,
event: RPCMessage<RequestId<AppReqId>, E>,
) -> Option<NetworkEvent<AppReqId, E>> {
fn inject_rpc_event(&mut self, event: RPCMessage<RequestId, E>) -> Option<NetworkEvent<E>> {
let peer_id = event.peer_id;
// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
@@ -1619,10 +1616,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
}
/// Handle an identify event.
fn inject_identify_event(
&mut self,
event: identify::Event,
) -> Option<NetworkEvent<AppReqId, E>> {
fn inject_identify_event(&mut self, event: identify::Event) -> Option<NetworkEvent<E>> {
match event {
identify::Event::Received { peer_id, mut info } => {
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
@@ -1643,7 +1637,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
}
/// Handle a peer manager event.
fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option<NetworkEvent<AppReqId, E>> {
fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option<NetworkEvent<E>> {
match event {
PeerManagerEvent::PeerConnectedIncoming(peer_id) => {
Some(NetworkEvent::PeerConnectedIncoming(peer_id))
@@ -1747,7 +1741,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
/// Poll the p2p networking stack.
///
/// This will poll the swarm and do maintenance routines.
pub fn poll_network(&mut self, cx: &mut Context) -> Poll<NetworkEvent<AppReqId, E>> {
pub fn poll_network(&mut self, cx: &mut Context) -> Poll<NetworkEvent<E>> {
while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) {
let maybe_event = match swarm_event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
@@ -1889,7 +1883,7 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
Poll::Pending
}
pub async fn next_event(&mut self) -> NetworkEvent<AppReqId, E> {
pub async fn next_event(&mut self) -> NetworkEvent<E> {
futures::future::poll_fn(|cx| self.poll_network(cx)).await
}
}

View File

@@ -13,7 +13,6 @@ use types::{
};
type E = MinimalEthSpec;
type ReqId = usize;
use tempfile::Builder as TempBuilder;
@@ -44,14 +43,14 @@ pub fn fork_context(fork_name: ForkName) -> ForkContext {
}
pub struct Libp2pInstance(
LibP2PService<ReqId, E>,
LibP2PService<E>,
#[allow(dead_code)]
// This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute.
async_channel::Sender<()>,
);
impl std::ops::Deref for Libp2pInstance {
type Target = LibP2PService<ReqId, E>;
type Target = LibP2PService<E>;
fn deref(&self) -> &Self::Target {
&self.0
}
@@ -125,7 +124,7 @@ pub async fn build_libp2p_instance(
}
#[allow(dead_code)]
pub fn get_enr(node: &LibP2PService<ReqId, E>) -> Enr {
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
node.local_enr()
}

View File

@@ -4,6 +4,7 @@ mod common;
use common::Protocol;
use lighthouse_network::rpc::methods::*;
use lighthouse_network::service::api_types::AppRequestId;
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
@@ -99,12 +100,12 @@ fn test_tcp_status_rpc() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 10, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
id: 10,
id: AppRequestId::Router,
response,
} => {
// Should receive the RPC response
@@ -196,7 +197,6 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
// keep count of the number of messages received
let mut messages_received = 0;
let request_id = messages_to_send as usize;
// build the sender future
let sender_future = async {
loop {
@@ -205,7 +205,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, request_id, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
@@ -323,7 +323,6 @@ fn test_blobs_by_range_chunked_rpc() {
// keep count of the number of messages received
let mut messages_received = 0;
let request_id = messages_to_send as usize;
// build the sender future
let sender_future = async {
loop {
@@ -332,7 +331,7 @@ fn test_blobs_by_range_chunked_rpc() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, request_id, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
@@ -433,7 +432,6 @@ fn test_tcp_blocks_by_range_over_limit() {
let rpc_response_bellatrix_large =
Response::BlocksByRange(Some(Arc::new(signed_full_block)));
let request_id = messages_to_send as usize;
// build the sender future
let sender_future = async {
loop {
@@ -442,12 +440,12 @@ fn test_tcp_blocks_by_range_over_limit() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, request_id, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
// The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE
NetworkEvent::RPCFailed { id, .. } => {
assert_eq!(id, request_id);
assert!(matches!(id, AppRequestId::Router));
return;
}
_ => {} // Ignore other behaviour events
@@ -528,7 +526,6 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
// keep count of the number of messages received
let mut messages_received: u64 = 0;
let request_id = messages_to_send as usize;
// build the sender future
let sender_future = async {
loop {
@@ -537,7 +534,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, request_id, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
@@ -668,12 +665,12 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 10, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
id: 10,
id: AppRequestId::Router,
response,
} => match response {
Response::BlocksByRange(Some(_)) => {
@@ -793,12 +790,12 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 6, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
id: 6,
id: AppRequestId::Router,
response,
} => match response {
Response::BlocksByRoot(Some(_)) => {
@@ -926,12 +923,12 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 10, rpc_request.clone())
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
id: 10,
id: AppRequestId::Router,
response,
} => {
debug!(log, "Sender received a response");

View File

@@ -7,9 +7,8 @@
use crate::error;
use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor};
use crate::service::{NetworkMessage, RequestId};
use crate::service::NetworkMessage;
use crate::status::status_message;
use crate::sync::manager::RequestId as SyncId;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_processor::{
@@ -18,6 +17,7 @@ use beacon_processor::{
use futures::prelude::*;
use lighthouse_network::rpc::*;
use lighthouse_network::{
service::api_types::{AppRequestId, SyncRequestId},
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use logging::TimeLatch;
@@ -61,13 +61,13 @@ pub enum RouterMessage<E: EthSpec> {
/// An RPC response has been received.
RPCResponseReceived {
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
response: Response<E>,
},
/// An RPC request failed
RPCFailed {
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
error: RPCError,
},
/// A gossip message has been received. The fields are: message id, the peer that sent us this
@@ -235,7 +235,7 @@ impl<T: BeaconChainTypes> Router<T> {
fn handle_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
response: Response<T::EthSpec>,
) {
match response {
@@ -448,9 +448,9 @@ impl<T: BeaconChainTypes> Router<T> {
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
/// this function notifies the sync manager of the error.
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: AppRequestId, error: RPCError) {
// Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id {
if let AppRequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError {
peer_id,
request_id,
@@ -488,18 +488,18 @@ impl<T: BeaconChainTypes> Router<T> {
pub fn on_blocks_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
AppRequestId::Sync(sync_id) => match sync_id {
SyncRequestId::SingleBlock { .. } | SyncRequestId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
}
id @ SyncId::RangeBlockAndBlobs { .. } => id,
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
AppRequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
return;
}
@@ -522,7 +522,7 @@ impl<T: BeaconChainTypes> Router<T> {
pub fn on_blobs_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
trace!(
@@ -531,7 +531,7 @@ impl<T: BeaconChainTypes> Router<T> {
"peer" => %peer_id,
);
if let RequestId::Sync(id) = request_id {
if let AppRequestId::Sync(id) = request_id {
self.send_to_sync(SyncMessage::RpcBlob {
peer_id,
request_id: id,
@@ -550,22 +550,22 @@ impl<T: BeaconChainTypes> Router<T> {
pub fn on_blocks_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::RangeBlockAndBlobs { .. } => {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SingleBlock { .. } => id,
SyncRequestId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } => {
SyncRequestId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
return;
}
},
RequestId::Router => {
AppRequestId::Router => {
crit!(self.log, "All BBRoot requests belong to sync"; "peer_id" => %peer_id);
return;
}
@@ -588,22 +588,22 @@ impl<T: BeaconChainTypes> Router<T> {
pub fn on_blobs_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
request_id: AppRequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SingleBlob { .. } => id,
SyncRequestId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::RangeBlockAndBlobs { .. } => {
SyncRequestId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
},
RequestId::Router => {
AppRequestId::Router => {
crit!(self.log, "All BlobsByRoot requests belong to sync"; "peer_id" => %peer_id);
return;
}
@@ -667,7 +667,7 @@ impl<E: EthSpec> HandlerNetworkContext<E> {
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
self.inform_network(NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Router,
request_id: AppRequestId::Router,
request,
})
}

View File

@@ -1,4 +1,3 @@
use super::sync::manager::RequestId as SyncId;
use crate::nat;
use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
@@ -23,6 +22,7 @@ use lighthouse_network::{
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
};
use lighthouse_network::{
service::api_types::AppRequestId,
types::{core_topics_to_subscribe, GossipEncoding, GossipTopic},
MessageId, NetworkEvent, NetworkGlobals, PeerId,
};
@@ -51,13 +51,6 @@ const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
/// able to run tens of thousands of validators on one BN.
const VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE: usize = 65_536;
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]
pub enum RequestId {
Sync(SyncId),
Router,
}
/// Types of messages that the network service can receive.
#[derive(Debug, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
@@ -69,7 +62,7 @@ pub enum NetworkMessage<E: EthSpec> {
SendRequest {
peer_id: PeerId,
request: Request,
request_id: RequestId,
request_id: AppRequestId,
},
/// Send a successful Response to the libp2p service.
SendResponse {
@@ -168,7 +161,7 @@ pub struct NetworkService<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
beacon_chain: Arc<BeaconChain<T>>,
/// The underlying libp2p service that drives all the network interactions.
libp2p: Network<RequestId, T::EthSpec>,
libp2p: Network<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// A sync committeee subnet manager service.
@@ -499,7 +492,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
/// Handle an event received from the network.
async fn on_libp2p_event(
&mut self,
ev: NetworkEvent<RequestId, T::EthSpec>,
ev: NetworkEvent<T::EthSpec>,
shutdown_sender: &mut Sender<ShutdownReason>,
) {
match ev {

View File

@@ -9,7 +9,7 @@
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::manager::{BatchProcessResult, Id};
use crate::sync::manager::BatchProcessResult;
use crate::sync::network_context::RangeRequestId;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::{
@@ -17,6 +17,7 @@ use crate::sync::range_sync::{
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
use rand::seq::SliceRandom;

View File

@@ -2,10 +2,10 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::Id;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::SignedBeaconBlock;

View File

@@ -28,12 +28,12 @@ use super::network_context::{RpcResponseResult, SyncNetworkContext};
use crate::metrics;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::manager::{Id, SingleLookupReqId};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
@@ -107,6 +107,9 @@ pub struct BlockLookups<T: BeaconChainTypes> {
log: Logger,
}
#[cfg(test)]
use lighthouse_network::service::api_types::Id;
#[cfg(test)]
/// Tuple of `SingleLookupId`, requested block root, awaiting parent block root (if any),
/// and list of peers that claim to have imported this set of block components.

View File

@@ -1,12 +1,12 @@
use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::{
LookupRequestResult, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext,
};
use beacon_chain::BeaconChainTypes;
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;

View File

@@ -1,9 +1,6 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::RequestId;
use crate::sync::manager::{
BlockProcessType, RequestId as SyncRequestId, SingleLookupReqId, SyncManager,
};
use crate::sync::manager::{BlockProcessType, SyncManager};
use crate::sync::SyncMessage;
use crate::NetworkMessage;
use std::sync::Arc;
@@ -24,6 +21,7 @@ use beacon_chain::{
};
use beacon_processor::WorkEvent;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::types::SyncState;
use lighthouse_network::{NetworkGlobals, Request};
use slog::info;
@@ -550,7 +548,7 @@ impl TestRig {
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Sync(id),
request_id: AppRequestId::Sync(id),
..
} if *peer_id == disconnected_peer_id => Some(*id),
_ => None,
@@ -631,7 +629,7 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
_ => None,
})
@@ -651,7 +649,7 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
} if request
.blob_ids
.to_vec()
@@ -676,7 +674,7 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
_ => None,
})
@@ -698,7 +696,7 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
} if request
.blob_ids
.to_vec()

View File

@@ -53,6 +53,7 @@ use beacon_chain::{
};
use futures::StreamExt;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
@@ -78,25 +79,6 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32;
/// arbitrary number that covers a full slot, but allows recovery if sync get stuck for a few slots.
const NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS: u64 = 30;
pub type Id = u32;
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SingleLookupReqId {
pub lookup_id: Id,
pub req_id: Id,
}
/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId {
/// Request searching for a block given a hash.
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}
#[derive(Debug)]
/// A message that can be sent to the sync manager thread.
pub enum SyncMessage<E: EthSpec> {
@@ -105,7 +87,7 @@ pub enum SyncMessage<E: EthSpec> {
/// A block has been received from the RPC.
RpcBlock {
request_id: RequestId,
request_id: SyncRequestId,
peer_id: PeerId,
beacon_block: Option<Arc<SignedBeaconBlock<E>>>,
seen_timestamp: Duration,
@@ -113,7 +95,7 @@ pub enum SyncMessage<E: EthSpec> {
/// A blob has been received from the RPC.
RpcBlob {
request_id: RequestId,
request_id: SyncRequestId,
peer_id: PeerId,
blob_sidecar: Option<Arc<BlobSidecar<E>>>,
seen_timestamp: Duration,
@@ -135,7 +117,7 @@ pub enum SyncMessage<E: EthSpec> {
/// An RPC Error has occurred on a request.
RpcError {
peer_id: PeerId,
request_id: RequestId,
request_id: SyncRequestId,
error: RPCError,
},
@@ -342,16 +324,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC");
match request_id {
RequestId::SingleBlock { id } => {
SyncRequestId::SingleBlock { id } => {
self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error))
}
RequestId::SingleBlob { id } => {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
RequestId::RangeBlockAndBlobs { id } => {
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
RangeRequestId::RangeSync { chain_id, batch_id } => {
@@ -835,13 +817,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn rpc_block_received(
&mut self,
request_id: RequestId,
request_id: SyncRequestId,
peer_id: PeerId,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.on_single_block_response(
SyncRequestId::SingleBlock { id } => self.on_single_block_response(
id,
peer_id,
match block {
@@ -849,10 +831,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
RequestId::SingleBlob { .. } => {
SyncRequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
RequestId::RangeBlockAndBlobs { id } => {
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block.into())
}
}
@@ -877,16 +859,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn rpc_blob_received(
&mut self,
request_id: RequestId,
request_id: SyncRequestId,
peer_id: PeerId,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { .. } => {
SyncRequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
RequestId::SingleBlob { id } => self.on_single_blob_response(
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
id,
peer_id,
match blob {
@@ -894,7 +876,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
RequestId::RangeBlockAndBlobs { id } => {
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, blob.into())
}
}
@@ -978,7 +960,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"sender_id" => ?resp.sender_id,
"error" => e.clone()
);
let id = RequestId::RangeBlockAndBlobs { id };
let id = SyncRequestId::RangeBlockAndBlobs { id };
self.network.report_peer(
peer_id,
PeerAction::MidToleranceError,

View File

@@ -4,18 +4,18 @@
use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest};
pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest};
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use crate::sync::manager::BlockProcessType;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, error, trace, warn};
@@ -246,7 +246,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let request = Request::Status(status_message.clone());
let request_id = RequestId::Router;
let request_id = AppRequestId::Router;
let _ = self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
@@ -274,7 +274,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRange(request.clone()),
request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
@@ -295,7 +295,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
start_slot: *request.start_slot(),
count: *request.count(),
}),
request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
}
@@ -424,7 +424,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRoot(request.into_request(&self.chain.spec)),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
@@ -510,7 +510,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;

View File

@@ -1,6 +1,6 @@
use crate::sync::manager::Id;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

View File

@@ -1,12 +1,11 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};

View File

@@ -44,12 +44,12 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::status::ToStatusMessage;
use crate::sync::manager::Id;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::rpc::GoodbyeReason;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
use lru_cache::LRUTimeCache;
@@ -380,7 +380,6 @@ where
#[cfg(test)]
mod tests {
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::RequestId;
use crate::NetworkMessage;
use super::*;
@@ -391,7 +390,10 @@ mod tests {
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_chain::EngineState;
use beacon_processor::WorkEvent as BeaconWorkEvent;
use lighthouse_network::{rpc::StatusMessage, NetworkGlobals};
use lighthouse_network::service::api_types::SyncRequestId;
use lighthouse_network::{
rpc::StatusMessage, service::api_types::AppRequestId, NetworkGlobals,
};
use slog::{o, Drain};
use slot_clock::TestingSlotClock;
use std::collections::HashSet;
@@ -517,7 +519,7 @@ mod tests {
&mut self,
expected_peer: &PeerId,
fork_name: ForkName,
) -> (RequestId, Option<RequestId>) {
) -> (AppRequestId, Option<AppRequestId>) {
let block_req_id = if let Ok(NetworkMessage::SendRequest {
peer_id,
request: _,
@@ -550,12 +552,12 @@ mod tests {
fn complete_range_block_and_blobs_response(
&mut self,
block_req: RequestId,
blob_req_opt: Option<RequestId>,
block_req: AppRequestId,
blob_req_opt: Option<AppRequestId>,
) -> (ChainId, BatchId, Id) {
if blob_req_opt.is_some() {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
let _ = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None));
@@ -571,7 +573,7 @@ mod tests {
}
} else {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
let response = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None))