mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-29 10:43:42 +00:00
RPC RequestId Cleanup (#7238)
I've been working at updating another library to latest Lighthouse and got very confused with RPC request Ids. There were types that had fields called `request_id` and `id`. And interchangeably could have types `PeerRequestId`, `rpc::RequestId`, `AppRequestId`, `api_types::RequestId` or even `Request.id`. I couldn't keep track of which Id was linked to what and what each type meant. So this PR mainly does a few things: - Changes the field naming to match the actual type. So any field that has an `AppRequestId` will be named `app_request_id` rather than `id` or `request_id` for example. - I simplified the types. I removed the two different `RequestId` types (one in Lighthouse_network the other in the rpc) and grouped them into one. It has one downside tho. I had to add a few unreachable lines of code in the beacon processor, which the extra type would prevent, but I feel like it might be worth it. Happy to add an extra type to avoid those few lines. - I also removed the concept of `PeerRequestId` which sometimes went alongside a `request_id`. There were times were had a `PeerRequest` and a `Request` being returned, both of which contain a `RequestId` so we had redundant information. I've simplified the logic by removing `PeerRequestId` and made a `ResponseId`. I think if you look at the code changes, it simplifies things a bit and removes the redundant extra info. I think with this PR things are a little bit easier to reasonable about what is going on with all these RPC Ids. NOTE: I did this with the help of AI, so probably should be checked
This commit is contained in:
@@ -122,6 +122,6 @@ pub use peer_manager::{
|
||||
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
|
||||
};
|
||||
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||
pub use service::api_types::{PeerRequestId, Response};
|
||||
pub use service::api_types::Response;
|
||||
pub use service::utils::*;
|
||||
pub use service::{Gossipsub, NetworkEvent};
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
|
||||
use super::outbound::OutboundRequestContainer;
|
||||
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
|
||||
use super::RequestId;
|
||||
use super::{RPCReceived, RPCSend, ReqId, Request};
|
||||
use super::{RPCReceived, RPCSend, ReqId};
|
||||
use crate::rpc::outbound::OutboundFramed;
|
||||
use crate::rpc::protocol::InboundFramed;
|
||||
use fnv::FnvHashMap;
|
||||
@@ -91,6 +90,11 @@ pub struct RPCHandler<Id, E>
|
||||
where
|
||||
E: EthSpec,
|
||||
{
|
||||
/// The PeerId matching this `ConnectionHandler`.
|
||||
peer_id: PeerId,
|
||||
|
||||
/// The ConnectionId matching this `ConnectionHandler`.
|
||||
connection_id: ConnectionId,
|
||||
/// The upgrade for inbound substreams.
|
||||
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
||||
|
||||
@@ -139,9 +143,6 @@ where
|
||||
|
||||
/// Timeout that will me used for inbound and outbound responses.
|
||||
resp_timeout: Duration,
|
||||
|
||||
/// Information about this handler for logging purposes.
|
||||
log_info: (PeerId, ConnectionId),
|
||||
}
|
||||
|
||||
enum HandlerState {
|
||||
@@ -228,6 +229,8 @@ where
|
||||
connection_id: ConnectionId,
|
||||
) -> Self {
|
||||
RPCHandler {
|
||||
connection_id,
|
||||
peer_id,
|
||||
listen_protocol,
|
||||
events_out: SmallVec::new(),
|
||||
dial_queue: SmallVec::new(),
|
||||
@@ -244,7 +247,6 @@ where
|
||||
fork_context,
|
||||
waker: None,
|
||||
resp_timeout,
|
||||
log_info: (peer_id, connection_id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,8 +257,8 @@ where
|
||||
if !self.dial_queue.is_empty() {
|
||||
debug!(
|
||||
unsent_queued_requests = self.dial_queue.len(),
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Starting handler shutdown"
|
||||
);
|
||||
}
|
||||
@@ -306,8 +308,8 @@ where
|
||||
if !matches!(response, RpcResponse::StreamTermination(..)) {
|
||||
// the stream is closed after sending the expected number of responses
|
||||
trace!(%response, id = ?inbound_id,
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Inbound stream has expired. Response not sent");
|
||||
}
|
||||
return;
|
||||
@@ -324,8 +326,8 @@ where
|
||||
if matches!(self.state, HandlerState::Deactivated) {
|
||||
// we no longer send responses after the handler is deactivated
|
||||
debug!(%response, id = ?inbound_id,
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Response not sent. Deactivated handler");
|
||||
return;
|
||||
}
|
||||
@@ -394,8 +396,8 @@ where
|
||||
Poll::Ready(_) => {
|
||||
self.state = HandlerState::Deactivated;
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Shutdown timeout elapsed, Handler deactivated"
|
||||
);
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
@@ -445,8 +447,8 @@ where
|
||||
)));
|
||||
} else {
|
||||
crit!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
stream_id = ?outbound_id.get_ref(), "timed out substream not in the books");
|
||||
}
|
||||
}
|
||||
@@ -577,8 +579,8 @@ where
|
||||
// Its useful to log when the request was completed.
|
||||
if matches!(info.protocol, Protocol::BlocksByRange) {
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
duration = Instant::now()
|
||||
.duration_since(info.request_start_time)
|
||||
.as_secs(),
|
||||
@@ -587,8 +589,8 @@ where
|
||||
}
|
||||
if matches!(info.protocol, Protocol::BlobsByRange) {
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
duration = Instant::now()
|
||||
.duration_since(info.request_start_time)
|
||||
.as_secs(),
|
||||
@@ -617,16 +619,16 @@ where
|
||||
|
||||
if matches!(info.protocol, Protocol::BlocksByRange) {
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
duration = info.request_start_time.elapsed().as_secs(),
|
||||
"BlocksByRange Response failed"
|
||||
);
|
||||
}
|
||||
if matches!(info.protocol, Protocol::BlobsByRange) {
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
duration = info.request_start_time.elapsed().as_secs(),
|
||||
"BlobsByRange Response failed"
|
||||
);
|
||||
@@ -816,8 +818,8 @@ where
|
||||
}
|
||||
OutboundSubstreamState::Poisoned => {
|
||||
crit!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Poisoned outbound substream"
|
||||
);
|
||||
unreachable!("Coding Error: Outbound substream is poisoned")
|
||||
@@ -852,8 +854,8 @@ where
|
||||
&& self.dial_negotiated == 0
|
||||
{
|
||||
debug!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
"Goodbye sent, Handler deactivated"
|
||||
);
|
||||
self.state = HandlerState::Deactivated;
|
||||
@@ -986,12 +988,13 @@ where
|
||||
self.shutdown(None);
|
||||
}
|
||||
|
||||
self.events_out
|
||||
.push(HandlerEvent::Ok(RPCReceived::Request(Request {
|
||||
id: RequestId::next(),
|
||||
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
|
||||
super::InboundRequestId {
|
||||
connection_id: self.connection_id,
|
||||
substream_id: self.current_inbound_substream_id,
|
||||
r#type: req,
|
||||
})));
|
||||
},
|
||||
req,
|
||||
)));
|
||||
self.current_inbound_substream_id.0 += 1;
|
||||
}
|
||||
|
||||
@@ -1049,9 +1052,8 @@ where
|
||||
.is_some()
|
||||
{
|
||||
crit!(
|
||||
peer_id = %self.log_info.0,
|
||||
connection_id = %self.log_info.1,
|
||||
|
||||
peer_id = %self.peer_id,
|
||||
connection_id = %self.connection_id,
|
||||
id = ?self.current_outbound_substream_id, "Duplicate outbound substream id");
|
||||
}
|
||||
self.current_outbound_substream_id.0 += 1;
|
||||
|
||||
@@ -16,7 +16,6 @@ use libp2p::PeerId;
|
||||
use logging::crit;
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
@@ -49,8 +48,6 @@ mod protocol;
|
||||
mod rate_limiter;
|
||||
mod self_limiter;
|
||||
|
||||
static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(1);
|
||||
|
||||
/// Composite trait for a request id.
|
||||
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
|
||||
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
|
||||
@@ -80,7 +77,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
|
||||
///
|
||||
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
|
||||
/// *inbound* substream over which it is managed.
|
||||
Request(Request<E>),
|
||||
Request(InboundRequestId, RequestType<E>),
|
||||
/// A response received from the outside.
|
||||
///
|
||||
/// The `Id` corresponds to the application given ID of the original request sent to the
|
||||
@@ -91,35 +88,30 @@ pub enum RPCReceived<Id, E: EthSpec> {
|
||||
EndOfStream(Id, ResponseTermination),
|
||||
}
|
||||
|
||||
/// Rpc `Request` identifier.
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct RequestId(usize);
|
||||
// An identifier for the inbound requests received via Rpc.
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct InboundRequestId {
|
||||
/// The connection ID of the peer that sent the request.
|
||||
connection_id: ConnectionId,
|
||||
/// The ID of the substream that sent the request.
|
||||
substream_id: SubstreamId,
|
||||
}
|
||||
|
||||
impl RequestId {
|
||||
/// Returns the next available [`RequestId`].
|
||||
pub fn next() -> Self {
|
||||
Self(NEXT_REQUEST_ID.fetch_add(1, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
/// Creates an _unchecked_ [`RequestId`].
|
||||
impl InboundRequestId {
|
||||
/// Creates an _unchecked_ [`InboundRequestId`].
|
||||
///
|
||||
/// [`Rpc`] enforces that [`RequestId`]s are unique and not reused.
|
||||
/// [`Rpc`] enforces that [`InboundRequestId`]s are unique and not reused.
|
||||
/// This constructor does not, hence the _unchecked_.
|
||||
///
|
||||
/// It is primarily meant for allowing manual tests.
|
||||
pub fn new_unchecked(id: usize) -> Self {
|
||||
Self(id)
|
||||
pub fn new_unchecked(connection_id: usize, substream_id: usize) -> Self {
|
||||
Self {
|
||||
connection_id: ConnectionId::new_unchecked(connection_id),
|
||||
substream_id: SubstreamId::new(substream_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An Rpc Request.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Request<E: EthSpec> {
|
||||
pub id: RequestId,
|
||||
pub substream_id: SubstreamId,
|
||||
pub r#type: RequestType<E>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
@@ -136,7 +128,7 @@ pub struct RPCMessage<Id, E: EthSpec> {
|
||||
/// The peer that sent the message.
|
||||
pub peer_id: PeerId,
|
||||
/// Handler managing this message.
|
||||
pub conn_id: ConnectionId,
|
||||
pub connection_id: ConnectionId,
|
||||
/// The message that was sent.
|
||||
pub message: Result<RPCReceived<Id, E>, HandlerErr<Id>>,
|
||||
}
|
||||
@@ -215,14 +207,13 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
id: (ConnectionId, SubstreamId),
|
||||
_request_id: RequestId,
|
||||
event: RpcResponse<E>,
|
||||
request_id: InboundRequestId,
|
||||
response: RpcResponse<E>,
|
||||
) {
|
||||
self.events.push(ToSwarm::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::One(id.0),
|
||||
event: RPCSend::Response(id.1, event),
|
||||
handler: NotifyHandler::One(request_id.connection_id),
|
||||
event: RPCSend::Response(request_id.substream_id, response),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -387,7 +378,7 @@ where
|
||||
for (id, proto) in limiter.peer_disconnected(peer_id) {
|
||||
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id: connection_id,
|
||||
connection_id,
|
||||
message: Err(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
@@ -408,7 +399,7 @@ where
|
||||
} if *p == peer_id => {
|
||||
*event = ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id: connection_id,
|
||||
connection_id,
|
||||
message: Err(HandlerErr::Outbound {
|
||||
id: *request_id,
|
||||
proto: req.versioned_protocol().protocol(),
|
||||
@@ -424,21 +415,17 @@ where
|
||||
fn on_connection_handler_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
conn_id: ConnectionId,
|
||||
connection_id: ConnectionId,
|
||||
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
|
||||
) {
|
||||
match event {
|
||||
HandlerEvent::Ok(RPCReceived::Request(Request {
|
||||
id,
|
||||
substream_id,
|
||||
r#type,
|
||||
})) => {
|
||||
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
|
||||
if let Some(limiter) = self.limiter.as_mut() {
|
||||
// check if the request is conformant to the quota
|
||||
match limiter.allows(&peer_id, &r#type) {
|
||||
match limiter.allows(&peer_id, &request_type) {
|
||||
Err(RateLimitedErr::TooLarge) => {
|
||||
// we set the batch sizes, so this is a coding/config err for most protocols
|
||||
let protocol = r#type.versioned_protocol().protocol();
|
||||
let protocol = request_type.versioned_protocol().protocol();
|
||||
if matches!(
|
||||
protocol,
|
||||
Protocol::BlocksByRange
|
||||
@@ -448,7 +435,7 @@ where
|
||||
| Protocol::BlobsByRoot
|
||||
| Protocol::DataColumnsByRoot
|
||||
) {
|
||||
debug!(request = %r#type, %protocol, "Request too large to process");
|
||||
debug!(request = %request_type, %protocol, "Request too large to process");
|
||||
} else {
|
||||
// Other protocols shouldn't be sending large messages, we should flag the peer kind
|
||||
crit!(%protocol, "Request size too large to ever be processed");
|
||||
@@ -457,8 +444,7 @@ where
|
||||
// the handler upon receiving the error code will send it back to the behaviour
|
||||
self.send_response(
|
||||
peer_id,
|
||||
(conn_id, substream_id),
|
||||
id,
|
||||
request_id,
|
||||
RpcResponse::Error(
|
||||
RpcErrorResponse::RateLimited,
|
||||
"Rate limited. Request too large".into(),
|
||||
@@ -467,13 +453,12 @@ where
|
||||
return;
|
||||
}
|
||||
Err(RateLimitedErr::TooSoon(wait_time)) => {
|
||||
debug!(request = %r#type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
|
||||
debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
|
||||
// send an error code to the peer.
|
||||
// the handler upon receiving the error code will send it back to the behaviour
|
||||
self.send_response(
|
||||
peer_id,
|
||||
(conn_id, substream_id),
|
||||
id,
|
||||
request_id,
|
||||
RpcResponse::Error(
|
||||
RpcErrorResponse::RateLimited,
|
||||
format!("Wait {:?}", wait_time).into(),
|
||||
@@ -487,12 +472,11 @@ where
|
||||
}
|
||||
|
||||
// If we received a Ping, we queue a Pong response.
|
||||
if let RequestType::Ping(_) = r#type {
|
||||
trace!(connection_id = %conn_id, %peer_id, "Received Ping, queueing Pong");
|
||||
if let RequestType::Ping(_) = request_type {
|
||||
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
|
||||
self.send_response(
|
||||
peer_id,
|
||||
(conn_id, substream_id),
|
||||
id,
|
||||
request_id,
|
||||
RpcResponse::Success(RpcSuccessResponse::Pong(Ping {
|
||||
data: self.seq_number,
|
||||
})),
|
||||
@@ -501,25 +485,21 @@ where
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
message: Ok(RPCReceived::Request(Request {
|
||||
id,
|
||||
substream_id,
|
||||
r#type,
|
||||
})),
|
||||
connection_id,
|
||||
message: Ok(RPCReceived::Request(request_id, request_type)),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Ok(rpc) => {
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
connection_id,
|
||||
message: Ok(rpc),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Err(err) => {
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
connection_id,
|
||||
message: Err(err),
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -207,7 +207,7 @@ mod tests {
|
||||
use crate::rpc::rate_limiter::Quota;
|
||||
use crate::rpc::self_limiter::SelfRateLimiter;
|
||||
use crate::rpc::{Ping, Protocol, RequestType};
|
||||
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
|
||||
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
|
||||
use libp2p::PeerId;
|
||||
use logging::create_test_tracing_subscriber;
|
||||
use std::time::Duration;
|
||||
@@ -226,7 +226,7 @@ mod tests {
|
||||
Hash256::ZERO,
|
||||
&MainnetEthSpec::default_spec(),
|
||||
));
|
||||
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
|
||||
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
|
||||
SelfRateLimiter::new(config, fork_context).unwrap();
|
||||
let peer_id = PeerId::random();
|
||||
let lookup_id = 0;
|
||||
@@ -234,12 +234,12 @@ mod tests {
|
||||
for i in 1..=5u32 {
|
||||
let _ = limiter.allows(
|
||||
peer_id,
|
||||
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId {
|
||||
lookup_id,
|
||||
req_id: i,
|
||||
},
|
||||
})),
|
||||
}),
|
||||
RequestType::Ping(Ping { data: i as u64 }),
|
||||
);
|
||||
}
|
||||
@@ -256,9 +256,9 @@ mod tests {
|
||||
for i in 2..=5u32 {
|
||||
assert!(matches!(
|
||||
iter.next().unwrap().request_id,
|
||||
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId { req_id, .. },
|
||||
})) if req_id == i,
|
||||
}) if req_id == i,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -281,9 +281,9 @@ mod tests {
|
||||
for i in 3..=5 {
|
||||
assert!(matches!(
|
||||
iter.next().unwrap().request_id,
|
||||
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId { req_id, .. },
|
||||
})) if req_id == i,
|
||||
}) if req_id == i,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
use crate::rpc::{
|
||||
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
|
||||
SubstreamId,
|
||||
};
|
||||
use libp2p::swarm::ConnectionId;
|
||||
use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::Arc;
|
||||
use types::{
|
||||
@@ -10,9 +6,6 @@ use types::{
|
||||
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
|
||||
};
|
||||
|
||||
/// Identifier of requests sent by a peer.
|
||||
pub type PeerRequestId = (ConnectionId, SubstreamId);
|
||||
|
||||
pub type Id = u32;
|
||||
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -130,12 +123,6 @@ pub struct CustodyRequester(pub SingleLookupReqId);
|
||||
pub enum AppRequestId {
|
||||
Sync(SyncRequestId),
|
||||
Router,
|
||||
}
|
||||
|
||||
/// Global identifier of a request.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum RequestId {
|
||||
Application(AppRequestId),
|
||||
Internal,
|
||||
}
|
||||
|
||||
|
||||
@@ -10,8 +10,9 @@ use crate::peer_manager::{
|
||||
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
||||
use crate::rpc::methods::MetadataRequest;
|
||||
use crate::rpc::{
|
||||
self, GoodbyeReason, HandlerErr, NetworkParams, Protocol, RPCError, RPCMessage, RPCReceived,
|
||||
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
||||
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
|
||||
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
|
||||
RpcSuccessResponse, RPC,
|
||||
};
|
||||
use crate::types::{
|
||||
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||
@@ -20,7 +21,7 @@ use crate::types::{
|
||||
use crate::EnrExt;
|
||||
use crate::Eth2Enr;
|
||||
use crate::{metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use api_types::{AppRequestId, PeerRequestId, RequestId, Response};
|
||||
use api_types::{AppRequestId, Response};
|
||||
use futures::stream::StreamExt;
|
||||
use gossipsub::{
|
||||
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
|
||||
@@ -66,7 +67,7 @@ pub enum NetworkEvent<E: EthSpec> {
|
||||
/// An RPC Request that was sent failed.
|
||||
RPCFailed {
|
||||
/// The id of the failed request.
|
||||
id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
/// The peer to which this request was sent.
|
||||
peer_id: PeerId,
|
||||
/// The error of the failed request.
|
||||
@@ -76,15 +77,15 @@ pub enum NetworkEvent<E: EthSpec> {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the request. All responses to this request must use this id.
|
||||
id: PeerRequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
/// Request the peer sent.
|
||||
request: rpc::Request<E>,
|
||||
request_type: RequestType<E>,
|
||||
},
|
||||
ResponseReceived {
|
||||
/// Peer that sent the response.
|
||||
peer_id: PeerId,
|
||||
/// Id of the request to which the peer is responding.
|
||||
id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
/// Response the peer sent.
|
||||
response: Response<E>,
|
||||
},
|
||||
@@ -126,7 +127,7 @@ where
|
||||
/// The peer manager that keeps track of peer's reputation and status.
|
||||
pub peer_manager: PeerManager<E>,
|
||||
/// The Eth2 RPC specified in the wire-0 protocol.
|
||||
pub eth2_rpc: RPC<RequestId, E>,
|
||||
pub eth2_rpc: RPC<AppRequestId, E>,
|
||||
/// Discv5 Discovery protocol.
|
||||
pub discovery: Discovery<E>,
|
||||
/// Keep regular connection to peers and disconnect if absent.
|
||||
@@ -669,7 +670,7 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
pub fn eth2_rpc_mut(&mut self) -> &mut RPC<RequestId, E> {
|
||||
pub fn eth2_rpc_mut(&mut self) -> &mut RPC<AppRequestId, E> {
|
||||
&mut self.swarm.behaviour_mut().eth2_rpc
|
||||
}
|
||||
/// Discv5 Discovery protocol.
|
||||
@@ -720,7 +721,7 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
pub fn eth2_rpc(&self) -> &RPC<RequestId, E> {
|
||||
pub fn eth2_rpc(&self) -> &RPC<AppRequestId, E> {
|
||||
&self.swarm.behaviour().eth2_rpc
|
||||
}
|
||||
/// Discv5 Discovery protocol.
|
||||
@@ -1104,16 +1105,16 @@ impl<E: EthSpec> Network<E> {
|
||||
pub fn send_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
request: RequestType<E>,
|
||||
) -> Result<(), (AppRequestId, RPCError)> {
|
||||
// Check if the peer is connected before sending an RPC request
|
||||
if !self.swarm.is_connected(&peer_id) {
|
||||
return Err((request_id, RPCError::Disconnected));
|
||||
return Err((app_request_id, RPCError::Disconnected));
|
||||
}
|
||||
|
||||
self.eth2_rpc_mut()
|
||||
.send_request(peer_id, RequestId::Application(request_id), request);
|
||||
.send_request(peer_id, app_request_id, request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1127,12 +1128,11 @@ impl<E: EthSpec> Network<E> {
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
id: PeerRequestId,
|
||||
request_id: rpc::RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
response: Response<E>,
|
||||
) {
|
||||
self.eth2_rpc_mut()
|
||||
.send_response(peer_id, id, request_id, response.into())
|
||||
.send_response(peer_id, inbound_request_id, response.into())
|
||||
}
|
||||
|
||||
/// Inform the peer that their request produced an error.
|
||||
@@ -1145,15 +1145,13 @@ impl<E: EthSpec> Network<E> {
|
||||
pub fn send_error_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
id: PeerRequestId,
|
||||
request_id: rpc::RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
error: RpcErrorResponse,
|
||||
reason: String,
|
||||
) {
|
||||
self.eth2_rpc_mut().send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
RpcResponse::Error(error, reason.into()),
|
||||
)
|
||||
}
|
||||
@@ -1374,7 +1372,7 @@ impl<E: EthSpec> Network<E> {
|
||||
skip_all
|
||||
)]
|
||||
fn ping(&mut self, peer_id: PeerId) {
|
||||
self.eth2_rpc_mut().ping(peer_id, RequestId::Internal);
|
||||
self.eth2_rpc_mut().ping(peer_id, AppRequestId::Internal);
|
||||
}
|
||||
|
||||
/// Sends a METADATA request to a peer.
|
||||
@@ -1394,7 +1392,7 @@ impl<E: EthSpec> Network<E> {
|
||||
RequestType::MetaData(MetadataRequest::new_v2())
|
||||
};
|
||||
self.eth2_rpc_mut()
|
||||
.send_request(peer_id, RequestId::Internal, event);
|
||||
.send_request(peer_id, AppRequestId::Internal, event);
|
||||
}
|
||||
|
||||
/// Sends a METADATA response to a peer.
|
||||
@@ -1407,15 +1405,14 @@ impl<E: EthSpec> Network<E> {
|
||||
fn send_meta_data_response(
|
||||
&mut self,
|
||||
_req: MetadataRequest<E>,
|
||||
id: PeerRequestId,
|
||||
request_id: rpc::RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
peer_id: PeerId,
|
||||
) {
|
||||
let metadata = self.network_globals.local_metadata.read().clone();
|
||||
// The encoder is responsible for sending the negotiated version of the metadata
|
||||
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
|
||||
self.eth2_rpc_mut()
|
||||
.send_response(peer_id, id, request_id, event);
|
||||
.send_response(peer_id, inbound_request_id, event);
|
||||
}
|
||||
|
||||
// RPC Propagation methods
|
||||
@@ -1429,17 +1426,17 @@ impl<E: EthSpec> Network<E> {
|
||||
)]
|
||||
fn build_response(
|
||||
&mut self,
|
||||
id: RequestId,
|
||||
app_request_id: AppRequestId,
|
||||
peer_id: PeerId,
|
||||
response: Response<E>,
|
||||
) -> Option<NetworkEvent<E>> {
|
||||
match id {
|
||||
RequestId::Application(id) => Some(NetworkEvent::ResponseReceived {
|
||||
match app_request_id {
|
||||
AppRequestId::Internal => None,
|
||||
_ => Some(NetworkEvent::ResponseReceived {
|
||||
peer_id,
|
||||
id,
|
||||
app_request_id,
|
||||
response,
|
||||
}),
|
||||
RequestId::Internal => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1643,7 +1640,7 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
fn inject_rpc_event(&mut self, event: RPCMessage<RequestId, E>) -> Option<NetworkEvent<E>> {
|
||||
fn inject_rpc_event(&mut self, event: RPCMessage<AppRequestId, E>) -> Option<NetworkEvent<E>> {
|
||||
let peer_id = event.peer_id;
|
||||
|
||||
// Do not permit Inbound events from peers that are being disconnected or RPC requests,
|
||||
@@ -1656,7 +1653,6 @@ impl<E: EthSpec> Network<E> {
|
||||
return None;
|
||||
}
|
||||
|
||||
let connection_id = event.conn_id;
|
||||
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
|
||||
match event.message {
|
||||
Err(handler_err) => {
|
||||
@@ -1686,16 +1682,20 @@ impl<E: EthSpec> Network<E> {
|
||||
ConnectionDirection::Outgoing,
|
||||
);
|
||||
// inform failures of requests coming outside the behaviour
|
||||
if let RequestId::Application(id) = id {
|
||||
Some(NetworkEvent::RPCFailed { peer_id, id, error })
|
||||
} else {
|
||||
if let AppRequestId::Internal = id {
|
||||
None
|
||||
} else {
|
||||
Some(NetworkEvent::RPCFailed {
|
||||
peer_id,
|
||||
app_request_id: id,
|
||||
error,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RPCReceived::Request(request)) => {
|
||||
match request.r#type {
|
||||
Ok(RPCReceived::Request(inbound_request_id, request_type)) => {
|
||||
match request_type {
|
||||
/* Behaviour managed protocols: Ping and Metadata */
|
||||
RequestType::Ping(ping) => {
|
||||
// inform the peer manager and send the response
|
||||
@@ -1704,12 +1704,7 @@ impl<E: EthSpec> Network<E> {
|
||||
}
|
||||
RequestType::MetaData(req) => {
|
||||
// send the requested meta-data
|
||||
self.send_meta_data_response(
|
||||
req,
|
||||
(connection_id, request.substream_id),
|
||||
request.id,
|
||||
peer_id,
|
||||
);
|
||||
self.send_meta_data_response(req, inbound_request_id, peer_id);
|
||||
None
|
||||
}
|
||||
RequestType::Goodbye(reason) => {
|
||||
@@ -1734,8 +1729,8 @@ impl<E: EthSpec> Network<E> {
|
||||
// propagate the STATUS message upwards
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::BlocksByRange(ref req) => {
|
||||
@@ -1757,32 +1752,32 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::BlocksByRoot(_) => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"]);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::BlobsByRange(_) => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::BlobsByRoot(_) => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_root"]);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::DataColumnsByRoot(_) => {
|
||||
@@ -1792,8 +1787,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::DataColumnsByRange(_) => {
|
||||
@@ -1803,8 +1798,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::LightClientBootstrap(_) => {
|
||||
@@ -1814,8 +1809,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::LightClientOptimisticUpdate => {
|
||||
@@ -1825,8 +1820,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::LightClientFinalityUpdate => {
|
||||
@@ -1836,8 +1831,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
RequestType::LightClientUpdatesByRange(_) => {
|
||||
@@ -1847,8 +1842,8 @@ impl<E: EthSpec> Network<E> {
|
||||
);
|
||||
Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id: (connection_id, request.substream_id),
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2010,7 +2005,7 @@ impl<E: EthSpec> Network<E> {
|
||||
debug!(%peer_id, %reason, "Peer Manager disconnecting peer");
|
||||
// send one goodbye
|
||||
self.eth2_rpc_mut()
|
||||
.shutdown(peer_id, RequestId::Internal, reason);
|
||||
.shutdown(peer_id, AppRequestId::Internal, reason);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ fn test_tcp_status_rpc() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: AppRequestId::Router,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => {
|
||||
// Should receive the RPC response
|
||||
@@ -118,13 +118,17 @@ fn test_tcp_status_rpc() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
debug!("Receiver Received");
|
||||
receiver.send_response(peer_id, id, request.id, rpc_response.clone());
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {} // Ignore other events
|
||||
@@ -204,7 +208,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: _,
|
||||
app_request_id: _,
|
||||
response,
|
||||
} => {
|
||||
warn!("Sender received a response");
|
||||
@@ -240,10 +244,10 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
for i in 0..messages_to_send {
|
||||
@@ -258,16 +262,14 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
|
||||
};
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
Response::BlocksByRange(None),
|
||||
);
|
||||
}
|
||||
@@ -338,7 +340,7 @@ fn test_blobs_by_range_chunked_rpc() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: _,
|
||||
app_request_id: _,
|
||||
response,
|
||||
} => {
|
||||
warn!("Sender received a response");
|
||||
@@ -368,10 +370,10 @@ fn test_blobs_by_range_chunked_rpc() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
for _ in 0..messages_to_send {
|
||||
@@ -379,16 +381,14 @@ fn test_blobs_by_range_chunked_rpc() {
|
||||
// second as altair and third as bellatrix.
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
Response::BlobsByRange(None),
|
||||
);
|
||||
}
|
||||
@@ -459,8 +459,8 @@ fn test_tcp_blocks_by_range_over_limit() {
|
||||
.unwrap();
|
||||
}
|
||||
// The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE
|
||||
NetworkEvent::RPCFailed { id, .. } => {
|
||||
assert!(matches!(id, AppRequestId::Router));
|
||||
NetworkEvent::RPCFailed { app_request_id, .. } => {
|
||||
assert!(matches!(app_request_id, AppRequestId::Router));
|
||||
return;
|
||||
}
|
||||
_ => {} // Ignore other behaviour events
|
||||
@@ -474,26 +474,24 @@ fn test_tcp_blocks_by_range_over_limit() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
for _ in 0..messages_to_send {
|
||||
let rpc_response = rpc_response_bellatrix_large.clone();
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
Response::BlocksByRange(None),
|
||||
);
|
||||
}
|
||||
@@ -566,7 +564,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: _,
|
||||
app_request_id: _,
|
||||
response,
|
||||
} =>
|
||||
// Should receive the RPC response
|
||||
@@ -608,15 +606,15 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
|
||||
futures::future::Either::Left((
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
},
|
||||
_,
|
||||
)) => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
message_info = Some((peer_id, id, request.id));
|
||||
message_info = Some((peer_id, inbound_request_id));
|
||||
}
|
||||
}
|
||||
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
||||
@@ -626,8 +624,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
|
||||
// if we need to send messages send them here. This will happen after a delay
|
||||
if message_info.is_some() {
|
||||
messages_sent += 1;
|
||||
let (peer_id, stream_id, request_id) = message_info.as_ref().unwrap();
|
||||
receiver.send_response(*peer_id, *stream_id, *request_id, rpc_response.clone());
|
||||
let (peer_id, inbound_request_id) = message_info.as_ref().unwrap();
|
||||
receiver.send_response(*peer_id, *inbound_request_id, rpc_response.clone());
|
||||
debug!("Sending message {}", messages_sent);
|
||||
if messages_sent == messages_to_send + extra_messages_to_send {
|
||||
// stop sending messages
|
||||
@@ -700,7 +698,7 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: AppRequestId::Router,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => match response {
|
||||
Response::BlocksByRange(Some(_)) => {
|
||||
@@ -727,26 +725,24 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
|
||||
for _ in 1..=messages_to_send {
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
Response::BlocksByRange(None),
|
||||
);
|
||||
}
|
||||
@@ -837,7 +833,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: AppRequestId::Router,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => match response {
|
||||
Response::BlocksByRoot(Some(_)) => {
|
||||
@@ -870,10 +866,10 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
debug!("Receiver got request");
|
||||
|
||||
@@ -886,14 +882,13 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
|
||||
} else {
|
||||
rpc_response_bellatrix_small.clone()
|
||||
};
|
||||
receiver.send_response(peer_id, id, request.id, rpc_response);
|
||||
receiver.send_response(peer_id, inbound_request_id, rpc_response);
|
||||
debug!("Sending message");
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
id,
|
||||
request.id,
|
||||
inbound_request_id,
|
||||
Response::BlocksByRange(None),
|
||||
);
|
||||
debug!("Send stream term");
|
||||
@@ -977,7 +972,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
id: AppRequestId::Router,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => {
|
||||
debug!("Sender received a response");
|
||||
@@ -1019,15 +1014,15 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
futures::future::Either::Left((
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
},
|
||||
_,
|
||||
)) => {
|
||||
if request.r#type == rpc_request {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
warn!("Receiver got request");
|
||||
message_info = Some((peer_id, id, request.id));
|
||||
message_info = Some((peer_id, inbound_request_id));
|
||||
}
|
||||
}
|
||||
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
||||
@@ -1037,8 +1032,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
// if we need to send messages send them here. This will happen after a delay
|
||||
if message_info.is_some() {
|
||||
messages_sent += 1;
|
||||
let (peer_id, stream_id, request_id) = message_info.as_ref().unwrap();
|
||||
receiver.send_response(*peer_id, *stream_id, *request_id, rpc_response.clone());
|
||||
let (peer_id, inbound_request_id) = message_info.as_ref().unwrap();
|
||||
receiver.send_response(*peer_id, *inbound_request_id, rpc_response.clone());
|
||||
debug!("Sending message {}", messages_sent);
|
||||
if messages_sent == messages_to_send + extra_messages_to_send {
|
||||
// stop sending messages
|
||||
|
||||
@@ -15,12 +15,11 @@ use beacon_processor::{
|
||||
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache,
|
||||
GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent,
|
||||
};
|
||||
use lighthouse_network::discovery::ConnectionId;
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||
LightClientUpdatesByRangeRequest,
|
||||
};
|
||||
use lighthouse_network::rpc::{RequestId, SubstreamId};
|
||||
use lighthouse_network::rpc::InboundRequestId;
|
||||
use lighthouse_network::{
|
||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
|
||||
Client, MessageId, NetworkGlobals, PeerId, PubsubMessage,
|
||||
@@ -647,21 +646,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_blocks_by_range_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId, // Use ResponseId here
|
||||
request: BlocksByRangeRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = async move {
|
||||
processor
|
||||
.handle_blocks_by_range_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
.handle_blocks_by_range_request(peer_id, inbound_request_id, request)
|
||||
.await;
|
||||
};
|
||||
|
||||
@@ -675,21 +666,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_blocks_by_roots_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId, // Use ResponseId here
|
||||
request: BlocksByRootRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = async move {
|
||||
processor
|
||||
.handle_blocks_by_root_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
.handle_blocks_by_root_request(peer_id, inbound_request_id, request)
|
||||
.await;
|
||||
};
|
||||
|
||||
@@ -703,21 +686,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_blobs_by_range_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlobsByRangeRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_blobs_by_range_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
};
|
||||
let process_fn =
|
||||
move || processor.handle_blobs_by_range_request(peer_id, inbound_request_id, request);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
@@ -729,21 +703,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_blobs_by_roots_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlobsByRootRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_blobs_by_root_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
};
|
||||
let process_fn =
|
||||
move || processor.handle_blobs_by_root_request(peer_id, inbound_request_id, request);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
@@ -755,20 +720,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_data_columns_by_roots_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: DataColumnsByRootRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_data_columns_by_root_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
processor.handle_data_columns_by_root_request(peer_id, inbound_request_id, request)
|
||||
};
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
@@ -781,20 +738,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_data_columns_by_range_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: DataColumnsByRangeRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_data_columns_by_range_request(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
processor.handle_data_columns_by_range_request(peer_id, inbound_request_id, request)
|
||||
};
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
@@ -807,21 +756,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_light_client_bootstrap_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: LightClientBootstrapRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_light_client_bootstrap(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
};
|
||||
let process_fn =
|
||||
move || processor.handle_light_client_bootstrap(peer_id, inbound_request_id, request);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: true,
|
||||
@@ -833,19 +773,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_light_client_optimistic_update_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_light_client_optimistic_update(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
)
|
||||
};
|
||||
let process_fn =
|
||||
move || processor.handle_light_client_optimistic_update(peer_id, inbound_request_id);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: true,
|
||||
@@ -857,19 +789,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_light_client_finality_update_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_light_client_finality_update(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
)
|
||||
};
|
||||
let process_fn =
|
||||
move || processor.handle_light_client_finality_update(peer_id, inbound_request_id);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: true,
|
||||
@@ -881,20 +805,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_light_client_updates_by_range_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: LightClientUpdatesByRangeRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = move || {
|
||||
processor.handle_light_client_updates_by_range(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
processor.handle_light_client_updates_by_range(peer_id, inbound_request_id, request)
|
||||
};
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
|
||||
@@ -4,12 +4,11 @@ use crate::status::ToStatusMessage;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
||||
use itertools::process_results;
|
||||
use lighthouse_network::discovery::ConnectionId;
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||
};
|
||||
use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
|
||||
use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo};
|
||||
use methods::LightClientUpdatesByRangeRequest;
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
@@ -34,15 +33,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_response(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
response: Response<T::EthSpec>,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
) {
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
request_id,
|
||||
id: (connection_id, substream_id),
|
||||
inbound_request_id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
@@ -52,15 +48,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
peer_id: PeerId,
|
||||
error: RpcErrorResponse,
|
||||
reason: String,
|
||||
id: PeerRequestId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
) {
|
||||
self.send_network_message(NetworkMessage::SendErrorResponse {
|
||||
peer_id,
|
||||
error,
|
||||
reason,
|
||||
id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -161,24 +155,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn handle_blocks_by_root_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
self.clone()
|
||||
.handle_blocks_by_root_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
)
|
||||
.handle_blocks_by_root_request_inner(peer_id, inbound_request_id, request)
|
||||
.await,
|
||||
Response::BlocksByRoot,
|
||||
);
|
||||
@@ -188,9 +172,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn handle_blocks_by_root_request_inner(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
let log_results = |peer_id, requested_blocks, send_block_count| {
|
||||
@@ -220,10 +202,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
Ok(Some(block)) => {
|
||||
self.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::BlocksByRoot(Some(block.clone())),
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
);
|
||||
send_block_count += 1;
|
||||
}
|
||||
@@ -265,23 +245,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_blobs_by_root_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlobsByRootRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
self.handle_blobs_by_root_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
),
|
||||
inbound_request_id,
|
||||
self.handle_blobs_by_root_request_inner(peer_id, inbound_request_id, request),
|
||||
Response::BlobsByRoot,
|
||||
);
|
||||
}
|
||||
@@ -290,9 +260,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_blobs_by_root_request_inner(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: BlobsByRootRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root)
|
||||
@@ -314,10 +282,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
|
||||
self.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::BlobsByRoot(Some(blob)),
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
);
|
||||
send_blob_count += 1;
|
||||
} else {
|
||||
@@ -339,10 +305,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
if blob_sidecar.index == *index {
|
||||
self.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::BlobsByRoot(Some(blob_sidecar.clone())),
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
);
|
||||
send_blob_count += 1;
|
||||
break 'inner;
|
||||
@@ -375,23 +339,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_data_columns_by_root_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: DataColumnsByRootRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
self.handle_data_columns_by_root_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
request,
|
||||
),
|
||||
inbound_request_id,
|
||||
self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request),
|
||||
Response::DataColumnsByRoot,
|
||||
);
|
||||
}
|
||||
@@ -400,9 +354,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_data_columns_by_root_request_inner(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: DataColumnsByRootRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
let mut send_data_column_count = 0;
|
||||
@@ -416,10 +368,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
send_data_column_count += 1;
|
||||
self.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::DataColumnsByRoot(Some(data_column)),
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
);
|
||||
}
|
||||
Ok(None) => {} // no-op
|
||||
@@ -449,22 +399,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_light_client_updates_by_range(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: LightClientUpdatesByRangeRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
self.clone()
|
||||
.handle_light_client_updates_by_range_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
request,
|
||||
),
|
||||
Response::LightClientUpdatesByRange,
|
||||
@@ -475,9 +419,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_light_client_updates_by_range_request_inner(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: LightClientUpdatesByRangeRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
debug!(
|
||||
@@ -516,8 +458,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
response: Response::LightClientUpdatesByRange(Some(Arc::new(lc_update.clone()))),
|
||||
request_id,
|
||||
id: (connection_id, substream_id),
|
||||
inbound_request_id,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -549,16 +490,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_light_client_bootstrap(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request: LightClientBootstrapRequest,
|
||||
) {
|
||||
self.terminate_response_single_item(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
match self.chain.get_light_client_bootstrap(&request.root) {
|
||||
Ok(Some((bootstrap, _))) => Ok(Arc::new(bootstrap)),
|
||||
Ok(None) => Err((
|
||||
@@ -583,15 +520,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_light_client_optimistic_update(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
) {
|
||||
self.terminate_response_single_item(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
match self
|
||||
.chain
|
||||
.light_client_server_cache
|
||||
@@ -611,15 +544,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_light_client_finality_update(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
) {
|
||||
self.terminate_response_single_item(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
match self
|
||||
.chain
|
||||
.light_client_server_cache
|
||||
@@ -639,24 +568,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn handle_blocks_by_range_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
self.clone()
|
||||
.handle_blocks_by_range_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
req,
|
||||
)
|
||||
.handle_blocks_by_range_request_inner(peer_id, inbound_request_id, req)
|
||||
.await,
|
||||
Response::BlocksByRange,
|
||||
);
|
||||
@@ -666,9 +585,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn handle_blocks_by_range_request_inner(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
debug!(
|
||||
@@ -789,9 +706,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
blocks_sent += 1;
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
response: Response::BlocksByRange(Some(block.clone())),
|
||||
id: (connection_id, substream_id),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -852,23 +768,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_blobs_by_range_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: BlobsByRangeRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
self.handle_blobs_by_range_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
req,
|
||||
),
|
||||
inbound_request_id,
|
||||
self.handle_blobs_by_range_request_inner(peer_id, inbound_request_id, req),
|
||||
Response::BlobsByRange,
|
||||
);
|
||||
}
|
||||
@@ -877,9 +783,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
fn handle_blobs_by_range_request_inner(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: BlobsByRangeRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
debug!(
|
||||
@@ -1016,9 +920,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
blobs_sent += 1;
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
response: Response::BlobsByRange(Some(blob_sidecar.clone())),
|
||||
request_id,
|
||||
id: (connection_id, substream_id),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1048,23 +951,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_data_columns_by_range_request(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: DataColumnsByRangeRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
self.handle_data_columns_by_range_request_inner(
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
request_id,
|
||||
req,
|
||||
),
|
||||
inbound_request_id,
|
||||
self.handle_data_columns_by_range_request_inner(peer_id, inbound_request_id, req),
|
||||
Response::DataColumnsByRange,
|
||||
);
|
||||
}
|
||||
@@ -1073,9 +966,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn handle_data_columns_by_range_request_inner(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: DataColumnsByRangeRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
debug!(
|
||||
@@ -1205,11 +1096,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
data_columns_sent += 1;
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
response: Response::DataColumnsByRange(Some(
|
||||
data_column_sidecar.clone(),
|
||||
)),
|
||||
id: (connection_id, substream_id),
|
||||
});
|
||||
}
|
||||
Ok(None) => {} // no-op
|
||||
@@ -1252,32 +1142,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
fn terminate_response_single_item<R, F: Fn(R) -> Response<T::EthSpec>>(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
result: Result<R, (RpcErrorResponse, String)>,
|
||||
into_response: F,
|
||||
) {
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
// Not necessary to explicitly send a termination message if this InboundRequest
|
||||
// returns <= 1 for InboundRequest::expected_responses
|
||||
// https://github.com/sigp/lighthouse/blob/3058b96f2560f1da04ada4f9d8ba8e5651794ff6/beacon_node/lighthouse_network/src/rpc/handler.rs#L555-L558
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
response: into_response(resp),
|
||||
id: (connection_id, substream_id),
|
||||
});
|
||||
}
|
||||
Err((error_code, reason)) => {
|
||||
self.send_error_response(
|
||||
peer_id,
|
||||
error_code,
|
||||
reason,
|
||||
(connection_id, substream_id),
|
||||
request_id,
|
||||
);
|
||||
self.send_error_response(peer_id, error_code, reason, inbound_request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1287,27 +1165,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
fn terminate_response_stream<R, F: FnOnce(Option<R>) -> Response<T::EthSpec>>(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
result: Result<(), (RpcErrorResponse, &'static str)>,
|
||||
into_response: F,
|
||||
) {
|
||||
match result {
|
||||
Ok(_) => self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
response: into_response(None),
|
||||
id: (connection_id, substream_id),
|
||||
}),
|
||||
Err((error_code, reason)) => {
|
||||
self.send_error_response(
|
||||
peer_id,
|
||||
error_code,
|
||||
reason.into(),
|
||||
(connection_id, substream_id),
|
||||
request_id,
|
||||
);
|
||||
self.send_error_response(peer_id, error_code, reason.into(), inbound_request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,9 +14,8 @@ use beacon_chain::test_utils::{
|
||||
};
|
||||
use beacon_chain::{BeaconChain, WhenSlotSkipped};
|
||||
use beacon_processor::{work_reprocessing_queue::*, *};
|
||||
use lighthouse_network::discovery::ConnectionId;
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
|
||||
use lighthouse_network::rpc::{RequestId, SubstreamId};
|
||||
use lighthouse_network::rpc::InboundRequestId;
|
||||
use lighthouse_network::{
|
||||
discv5::enr::{self, CombinedKey},
|
||||
rpc::methods::{MetaData, MetaDataV2},
|
||||
@@ -366,9 +365,7 @@ impl TestRig {
|
||||
self.network_beacon_processor
|
||||
.send_blobs_by_range_request(
|
||||
PeerId::random(),
|
||||
ConnectionId::new_unchecked(42),
|
||||
SubstreamId::new(24),
|
||||
RequestId::new_unchecked(0),
|
||||
InboundRequestId::new_unchecked(42, 24),
|
||||
BlobsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count,
|
||||
@@ -1149,8 +1146,7 @@ async fn test_blobs_by_range() {
|
||||
if let NetworkMessage::SendResponse {
|
||||
peer_id: _,
|
||||
response: Response::BlobsByRange(blob),
|
||||
id: _,
|
||||
request_id: _,
|
||||
inbound_request_id: _,
|
||||
} = next
|
||||
{
|
||||
if blob.is_some() {
|
||||
|
||||
@@ -14,12 +14,10 @@ use beacon_processor::{
|
||||
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache,
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::discovery::ConnectionId;
|
||||
use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{
|
||||
rpc,
|
||||
service::api_types::{AppRequestId, SyncRequestId},
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Response,
|
||||
MessageId, NetworkGlobals, PeerId, PubsubMessage, Response,
|
||||
};
|
||||
use logging::crit;
|
||||
use logging::TimeLatch;
|
||||
@@ -54,19 +52,19 @@ pub enum RouterMessage<E: EthSpec> {
|
||||
/// An RPC request has been received.
|
||||
RPCRequestReceived {
|
||||
peer_id: PeerId,
|
||||
id: PeerRequestId,
|
||||
request: rpc::Request<E>,
|
||||
inbound_request_id: InboundRequestId,
|
||||
request_type: RequestType<E>,
|
||||
},
|
||||
/// An RPC response has been received.
|
||||
RPCResponseReceived {
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
response: Response<E>,
|
||||
},
|
||||
/// An RPC request failed
|
||||
RPCFailed {
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
error: RPCError,
|
||||
},
|
||||
/// A gossip message has been received. The fields are: message id, the peer that sent us this
|
||||
@@ -159,24 +157,24 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
}
|
||||
RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
self.handle_rpc_request(peer_id, id, request);
|
||||
self.handle_rpc_request(peer_id, inbound_request_id, request_type);
|
||||
}
|
||||
RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
request_id,
|
||||
app_request_id,
|
||||
response,
|
||||
} => {
|
||||
self.handle_rpc_response(peer_id, request_id, response);
|
||||
self.handle_rpc_response(peer_id, app_request_id, response);
|
||||
}
|
||||
RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id,
|
||||
app_request_id,
|
||||
error,
|
||||
} => {
|
||||
self.on_rpc_error(peer_id, request_id, error);
|
||||
self.on_rpc_error(peer_id, app_request_id, error);
|
||||
}
|
||||
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
|
||||
self.handle_gossip(id, peer_id, gossip, should_process);
|
||||
@@ -190,23 +188,18 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
fn handle_rpc_request<E: EthSpec>(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
rpc_request: rpc::Request<E>,
|
||||
inbound_request_id: InboundRequestId, // Use ResponseId here
|
||||
request_type: RequestType<E>,
|
||||
) {
|
||||
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
debug!( %peer_id, request = ?rpc_request, "Dropping request of disconnected peer");
|
||||
debug!(%peer_id, request = ?request_type, "Dropping request of disconnected peer");
|
||||
return;
|
||||
}
|
||||
match rpc_request.r#type {
|
||||
RequestType::Status(status_message) => self.on_status_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
status_message,
|
||||
),
|
||||
match request_type {
|
||||
RequestType::Status(status_message) => {
|
||||
self.on_status_request(peer_id, inbound_request_id, status_message)
|
||||
}
|
||||
RequestType::BlocksByRange(request) => {
|
||||
// return just one block in case the step parameter is used. https://github.com/ethereum/consensus-specs/pull/2856
|
||||
let mut count = *request.count();
|
||||
if *request.step() > 1 {
|
||||
count = 1;
|
||||
@@ -223,9 +216,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor.send_blocks_by_range_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
inbound_request_id,
|
||||
blocks_request,
|
||||
),
|
||||
)
|
||||
@@ -233,86 +224,50 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
RequestType::BlocksByRoot(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor.send_blocks_by_roots_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
inbound_request_id,
|
||||
request,
|
||||
),
|
||||
),
|
||||
RequestType::BlobsByRange(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor.send_blobs_by_range_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
inbound_request_id,
|
||||
request,
|
||||
),
|
||||
),
|
||||
RequestType::BlobsByRoot(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor.send_blobs_by_roots_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
inbound_request_id,
|
||||
request,
|
||||
),
|
||||
),
|
||||
RequestType::DataColumnsByRoot(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_data_columns_by_roots_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
request,
|
||||
),
|
||||
.send_data_columns_by_roots_request(peer_id, inbound_request_id, request),
|
||||
),
|
||||
RequestType::DataColumnsByRange(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_data_columns_by_range_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
request,
|
||||
),
|
||||
.send_data_columns_by_range_request(peer_id, inbound_request_id, request),
|
||||
),
|
||||
RequestType::LightClientBootstrap(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_light_client_bootstrap_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
request,
|
||||
),
|
||||
.send_light_client_bootstrap_request(peer_id, inbound_request_id, request),
|
||||
),
|
||||
RequestType::LightClientOptimisticUpdate => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_light_client_optimistic_update_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
),
|
||||
.send_light_client_optimistic_update_request(peer_id, inbound_request_id),
|
||||
),
|
||||
RequestType::LightClientFinalityUpdate => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_light_client_finality_update_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
),
|
||||
.send_light_client_finality_update_request(peer_id, inbound_request_id),
|
||||
),
|
||||
RequestType::LightClientUpdatesByRange(request) => self
|
||||
.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_light_client_updates_by_range_request(
|
||||
peer_id,
|
||||
request_id.0,
|
||||
request_id.1,
|
||||
rpc_request.id,
|
||||
inbound_request_id,
|
||||
request,
|
||||
),
|
||||
),
|
||||
@@ -324,7 +279,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
fn handle_rpc_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
response: Response<T::EthSpec>,
|
||||
) {
|
||||
match response {
|
||||
@@ -336,22 +291,22 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
)
|
||||
}
|
||||
Response::BlocksByRange(beacon_block) => {
|
||||
self.on_blocks_by_range_response(peer_id, request_id, beacon_block);
|
||||
self.on_blocks_by_range_response(peer_id, app_request_id, beacon_block);
|
||||
}
|
||||
Response::BlocksByRoot(beacon_block) => {
|
||||
self.on_blocks_by_root_response(peer_id, request_id, beacon_block);
|
||||
self.on_blocks_by_root_response(peer_id, app_request_id, beacon_block);
|
||||
}
|
||||
Response::BlobsByRange(blob) => {
|
||||
self.on_blobs_by_range_response(peer_id, request_id, blob);
|
||||
self.on_blobs_by_range_response(peer_id, app_request_id, blob);
|
||||
}
|
||||
Response::BlobsByRoot(blob) => {
|
||||
self.on_blobs_by_root_response(peer_id, request_id, blob);
|
||||
self.on_blobs_by_root_response(peer_id, app_request_id, blob);
|
||||
}
|
||||
Response::DataColumnsByRoot(data_column) => {
|
||||
self.on_data_columns_by_root_response(peer_id, request_id, data_column);
|
||||
self.on_data_columns_by_root_response(peer_id, app_request_id, data_column);
|
||||
}
|
||||
Response::DataColumnsByRange(data_column) => {
|
||||
self.on_data_columns_by_range_response(peer_id, request_id, data_column);
|
||||
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
|
||||
}
|
||||
// Light client responses should not be received
|
||||
Response::LightClientBootstrap(_)
|
||||
@@ -563,12 +518,12 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
|
||||
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
|
||||
/// this function notifies the sync manager of the error.
|
||||
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: AppRequestId, error: RPCError) {
|
||||
pub fn on_rpc_error(&mut self, peer_id: PeerId, app_request_id: AppRequestId, error: RPCError) {
|
||||
// Check if the failed RPC belongs to sync
|
||||
if let AppRequestId::Sync(request_id) = request_id {
|
||||
if let AppRequestId::Sync(sync_request_id) = app_request_id {
|
||||
self.send_to_sync(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
@@ -580,9 +535,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_status_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId, // Use ResponseId here
|
||||
status: StatusMessage,
|
||||
) {
|
||||
debug!(%peer_id, ?status, "Received Status Request");
|
||||
@@ -590,9 +543,8 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
// Say status back.
|
||||
self.network.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::Status(status_message(&self.chain)),
|
||||
(connection_id, substream_id),
|
||||
request_id,
|
||||
);
|
||||
|
||||
self.handle_beacon_processor_send_result(
|
||||
@@ -606,11 +558,11 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_blocks_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_request_id) => match sync_request_id {
|
||||
id @ SyncRequestId::BlocksByRange { .. } => id,
|
||||
other => {
|
||||
crit!(request = ?other, "BlocksByRange response on incorrect request");
|
||||
@@ -621,6 +573,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
crit!(%peer_id, "All BBRange requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
@@ -631,7 +584,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
@@ -640,7 +593,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_blobs_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
trace!(
|
||||
@@ -648,10 +601,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received BlobsByRange Response"
|
||||
);
|
||||
|
||||
if let AppRequestId::Sync(id) = request_id {
|
||||
if let AppRequestId::Sync(sync_request_id) = app_request_id {
|
||||
self.send_to_sync(SyncMessage::RpcBlob {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
sync_request_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
@@ -664,10 +617,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_blocks_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SingleBlock { .. } => id,
|
||||
other => {
|
||||
@@ -679,6 +632,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
crit!(%peer_id, "All BBRoot requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
@@ -687,7 +641,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
@@ -697,10 +651,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_blobs_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SingleBlob { .. } => id,
|
||||
other => {
|
||||
@@ -712,6 +666,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
crit!(%peer_id, "All BlobsByRoot requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
@@ -719,7 +674,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received BlobsByRoot Response"
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlob {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: timestamp_now(),
|
||||
@@ -730,10 +685,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_data_columns_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::DataColumnsByRoot { .. } => id,
|
||||
other => {
|
||||
@@ -745,6 +700,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
crit!(%peer_id, "All DataColumnsByRoot requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
@@ -752,7 +708,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received DataColumnsByRoot Response"
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
data_column,
|
||||
seen_timestamp: timestamp_now(),
|
||||
@@ -762,7 +718,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_data_columns_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
trace!(
|
||||
@@ -770,10 +726,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received DataColumnsByRange Response"
|
||||
);
|
||||
|
||||
if let AppRequestId::Sync(id) = request_id {
|
||||
if let AppRequestId::Sync(sync_request_id) = app_request_id {
|
||||
self.send_to_sync(SyncMessage::RpcDataColumn {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
sync_request_id,
|
||||
data_column,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
@@ -824,7 +780,7 @@ impl<E: EthSpec> HandlerNetworkContext<E> {
|
||||
pub fn send_processor_request(&mut self, peer_id: PeerId, request: RequestType<E>) {
|
||||
self.inform_network(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: AppRequestId::Router,
|
||||
app_request_id: AppRequestId::Router,
|
||||
request,
|
||||
})
|
||||
}
|
||||
@@ -833,14 +789,12 @@ impl<E: EthSpec> HandlerNetworkContext<E> {
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
response: Response<E>,
|
||||
id: PeerRequestId,
|
||||
request_id: RequestId,
|
||||
) {
|
||||
self.inform_network(NetworkMessage::SendResponse {
|
||||
request_id,
|
||||
peer_id,
|
||||
id,
|
||||
inbound_request_id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,14 +10,15 @@ use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconPro
|
||||
use futures::channel::mpsc::Sender;
|
||||
use futures::future::OptionFuture;
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::rpc::{RequestId, RequestType};
|
||||
use lighthouse_network::rpc::InboundRequestId;
|
||||
use lighthouse_network::rpc::RequestType;
|
||||
use lighthouse_network::service::Network;
|
||||
use lighthouse_network::types::GossipKind;
|
||||
use lighthouse_network::Enr;
|
||||
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
|
||||
use lighthouse_network::{
|
||||
rpc::{GoodbyeReason, RpcErrorResponse},
|
||||
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Response, Subnet,
|
||||
Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet,
|
||||
};
|
||||
use lighthouse_network::{
|
||||
service::api_types::AppRequestId,
|
||||
@@ -61,22 +62,20 @@ pub enum NetworkMessage<E: EthSpec> {
|
||||
SendRequest {
|
||||
peer_id: PeerId,
|
||||
request: RequestType<E>,
|
||||
request_id: AppRequestId,
|
||||
app_request_id: AppRequestId,
|
||||
},
|
||||
/// Send a successful Response to the libp2p service.
|
||||
SendResponse {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
response: Response<E>,
|
||||
id: PeerRequestId,
|
||||
},
|
||||
/// Sends an error response to an RPC request.
|
||||
SendErrorResponse {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
error: RpcErrorResponse,
|
||||
reason: String,
|
||||
id: PeerRequestId,
|
||||
},
|
||||
/// Publish a list of messages to the gossipsub protocol.
|
||||
Publish { messages: Vec<PubsubMessage<E>> },
|
||||
@@ -488,30 +487,34 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
}
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
});
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id,
|
||||
id,
|
||||
app_request_id,
|
||||
response,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
app_request_id,
|
||||
response,
|
||||
});
|
||||
}
|
||||
NetworkEvent::RPCFailed { id, peer_id, error } => {
|
||||
NetworkEvent::RPCFailed {
|
||||
app_request_id,
|
||||
peer_id,
|
||||
error,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
app_request_id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
@@ -601,35 +604,34 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
app_request_id,
|
||||
} => {
|
||||
if let Err((request_id, error)) =
|
||||
self.libp2p.send_request(peer_id, request_id, request)
|
||||
if let Err((app_request_id, error)) =
|
||||
self.libp2p.send_request(peer_id, app_request_id, request)
|
||||
{
|
||||
self.send_to_router(RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id,
|
||||
app_request_id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
response,
|
||||
id,
|
||||
request_id,
|
||||
} => {
|
||||
self.libp2p.send_response(peer_id, id, request_id, response);
|
||||
self.libp2p
|
||||
.send_response(peer_id, inbound_request_id, response);
|
||||
}
|
||||
NetworkMessage::SendErrorResponse {
|
||||
peer_id,
|
||||
error,
|
||||
id,
|
||||
request_id,
|
||||
inbound_request_id,
|
||||
reason,
|
||||
} => {
|
||||
self.libp2p
|
||||
.send_error_response(peer_id, id, request_id, error, reason);
|
||||
.send_error_response(peer_id, inbound_request_id, error, reason);
|
||||
}
|
||||
NetworkMessage::ValidationResult {
|
||||
propagation_source,
|
||||
|
||||
@@ -108,7 +108,7 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
|
||||
/// A block has been received from the RPC.
|
||||
RpcBlock {
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<E>>>,
|
||||
seen_timestamp: Duration,
|
||||
@@ -116,7 +116,7 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
|
||||
/// A blob has been received from the RPC.
|
||||
RpcBlob {
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
blob_sidecar: Option<Arc<BlobSidecar<E>>>,
|
||||
seen_timestamp: Duration,
|
||||
@@ -124,7 +124,7 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
|
||||
/// A data columns has been received from the RPC
|
||||
RpcDataColumn {
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
data_column: Option<Arc<DataColumnSidecar<E>>>,
|
||||
seen_timestamp: Duration,
|
||||
@@ -153,7 +153,7 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
/// An RPC Error has occurred on a request.
|
||||
RpcError {
|
||||
peer_id: PeerId,
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
error: RPCError,
|
||||
},
|
||||
|
||||
@@ -477,9 +477,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
|
||||
/// Handles RPC errors related to requests that were emitted from the sync manager.
|
||||
fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) {
|
||||
fn inject_error(&mut self, peer_id: PeerId, sync_request_id: SyncRequestId, error: RPCError) {
|
||||
trace!("Sync manager received a failed RPC");
|
||||
match request_id {
|
||||
match sync_request_id {
|
||||
SyncRequestId::SingleBlock { id } => {
|
||||
self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
@@ -509,8 +509,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
fn peer_disconnect(&mut self, peer_id: &PeerId) {
|
||||
// Inject a Disconnected error on all requests associated with the disconnected peer
|
||||
// to retry all batches/lookups
|
||||
for request_id in self.network.peer_disconnected(peer_id) {
|
||||
self.inject_error(*peer_id, request_id, RPCError::Disconnected);
|
||||
for sync_request_id in self.network.peer_disconnected(peer_id) {
|
||||
self.inject_error(*peer_id, sync_request_id, RPCError::Disconnected);
|
||||
}
|
||||
|
||||
// Remove peer from all data structures
|
||||
@@ -751,25 +751,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.add_peers_force_range_sync(&peers, head_root, head_slot);
|
||||
}
|
||||
SyncMessage::RpcBlock {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
beacon_block,
|
||||
seen_timestamp,
|
||||
} => {
|
||||
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
|
||||
self.rpc_block_received(sync_request_id, peer_id, beacon_block, seen_timestamp);
|
||||
}
|
||||
SyncMessage::RpcBlob {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp,
|
||||
} => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp),
|
||||
} => self.rpc_blob_received(sync_request_id, peer_id, blob_sidecar, seen_timestamp),
|
||||
SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
data_column,
|
||||
seen_timestamp,
|
||||
} => self.rpc_data_column_received(request_id, peer_id, data_column, seen_timestamp),
|
||||
} => {
|
||||
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
|
||||
}
|
||||
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
||||
let block_slot = block.slot();
|
||||
let parent_root = block.parent_root();
|
||||
@@ -845,9 +847,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
error,
|
||||
} => self.inject_error(peer_id, request_id, error),
|
||||
} => self.inject_error(peer_id, sync_request_id, error),
|
||||
SyncMessage::BlockComponentProcessed {
|
||||
process_type,
|
||||
result,
|
||||
@@ -1018,12 +1020,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
|
||||
fn rpc_block_received(
|
||||
&mut self,
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match request_id {
|
||||
match sync_request_id {
|
||||
SyncRequestId::SingleBlock { id } => self.on_single_block_response(
|
||||
id,
|
||||
peer_id,
|
||||
@@ -1060,12 +1062,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
|
||||
fn rpc_blob_received(
|
||||
&mut self,
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match request_id {
|
||||
match sync_request_id {
|
||||
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
|
||||
id,
|
||||
peer_id,
|
||||
@@ -1084,12 +1086,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
|
||||
fn rpc_data_column_received(
|
||||
&mut self,
|
||||
request_id: SyncRequestId,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match request_id {
|
||||
match sync_request_id {
|
||||
SyncRequestId::DataColumnsByRoot(req_id) => {
|
||||
self.on_data_columns_by_root_response(
|
||||
req_id,
|
||||
|
||||
@@ -372,11 +372,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
);
|
||||
|
||||
let request = RequestType::Status(status_message.clone());
|
||||
let request_id = AppRequestId::Router;
|
||||
let app_request_id = AppRequestId::Router;
|
||||
let _ = self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
app_request_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -595,7 +595,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
@@ -684,7 +684,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
@@ -733,7 +733,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
|
||||
})?;
|
||||
|
||||
debug!(
|
||||
@@ -839,7 +839,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlocksByRange(request.clone().into()),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
@@ -880,7 +880,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlobsByRange(request.clone()),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
@@ -919,7 +919,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::DataColumnsByRange(request.clone()),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||
|
||||
|
||||
@@ -460,7 +460,7 @@ impl TestRig {
|
||||
) {
|
||||
self.log("parent_lookup_block_response");
|
||||
self.send_sync_message(SyncMessage::RpcBlock {
|
||||
request_id: SyncRequestId::SingleBlock { id },
|
||||
sync_request_id: SyncRequestId::SingleBlock { id },
|
||||
peer_id,
|
||||
beacon_block,
|
||||
seen_timestamp: D,
|
||||
@@ -475,7 +475,7 @@ impl TestRig {
|
||||
) {
|
||||
self.log("single_lookup_block_response");
|
||||
self.send_sync_message(SyncMessage::RpcBlock {
|
||||
request_id: SyncRequestId::SingleBlock { id },
|
||||
sync_request_id: SyncRequestId::SingleBlock { id },
|
||||
peer_id,
|
||||
beacon_block,
|
||||
seen_timestamp: D,
|
||||
@@ -493,7 +493,7 @@ impl TestRig {
|
||||
blob_sidecar.as_ref().map(|b| b.index)
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcBlob {
|
||||
request_id: SyncRequestId::SingleBlob { id },
|
||||
sync_request_id: SyncRequestId::SingleBlob { id },
|
||||
peer_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: D,
|
||||
@@ -507,7 +507,7 @@ impl TestRig {
|
||||
blob_sidecar: Option<Arc<BlobSidecar<E>>>,
|
||||
) {
|
||||
self.send_sync_message(SyncMessage::RpcBlob {
|
||||
request_id: SyncRequestId::SingleBlob { id },
|
||||
sync_request_id: SyncRequestId::SingleBlob { id },
|
||||
peer_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: D,
|
||||
@@ -583,7 +583,7 @@ impl TestRig {
|
||||
fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
|
||||
self.send_sync_message(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id: SyncRequestId::SingleBlock { id },
|
||||
sync_request_id: SyncRequestId::SingleBlock { id },
|
||||
error,
|
||||
})
|
||||
}
|
||||
@@ -602,7 +602,7 @@ impl TestRig {
|
||||
fn single_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
|
||||
self.send_sync_message(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id: SyncRequestId::SingleBlock { id },
|
||||
sync_request_id: SyncRequestId::SingleBlock { id },
|
||||
error,
|
||||
})
|
||||
}
|
||||
@@ -614,11 +614,11 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn return_empty_sampling_request(&mut self, (request_id, _): DCByRootId) {
|
||||
fn return_empty_sampling_request(&mut self, (sync_request_id, _): DCByRootId) {
|
||||
let peer_id = PeerId::random();
|
||||
// Send stream termination
|
||||
self.send_sync_message(SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
data_column: None,
|
||||
seen_timestamp: timestamp_now(),
|
||||
@@ -631,10 +631,10 @@ impl TestRig {
|
||||
peer_id: PeerId,
|
||||
error: RPCError,
|
||||
) {
|
||||
for (request_id, _) in sampling_ids {
|
||||
for (sync_request_id, _) in sampling_ids {
|
||||
self.send_sync_message(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
error: error.clone(),
|
||||
})
|
||||
}
|
||||
@@ -760,14 +760,14 @@ impl TestRig {
|
||||
|
||||
fn complete_data_columns_by_root_request(
|
||||
&mut self,
|
||||
(request_id, _): DCByRootId,
|
||||
(sync_request_id, _): DCByRootId,
|
||||
data_columns: &[Arc<DataColumnSidecar<E>>],
|
||||
) {
|
||||
let peer_id = PeerId::random();
|
||||
for data_column in data_columns {
|
||||
// Send chunks
|
||||
self.send_sync_message(SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
data_column: Some(data_column.clone()),
|
||||
seen_timestamp: timestamp_now(),
|
||||
@@ -775,7 +775,7 @@ impl TestRig {
|
||||
}
|
||||
// Send stream termination
|
||||
self.send_sync_message(SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
data_column: None,
|
||||
seen_timestamp: timestamp_now(),
|
||||
@@ -785,17 +785,17 @@ impl TestRig {
|
||||
/// Return RPCErrors for all active requests of peer
|
||||
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
|
||||
self.drain_network_rx();
|
||||
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
|
||||
while let Ok(sync_request_id) = self.pop_received_network_event(|ev| match ev {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: AppRequestId::Sync(id),
|
||||
app_request_id: AppRequestId::Sync(id),
|
||||
..
|
||||
} if *peer_id == disconnected_peer_id => Some(*id),
|
||||
_ => None,
|
||||
}) {
|
||||
self.send_sync_message(SyncMessage::RpcError {
|
||||
peer_id: disconnected_peer_id,
|
||||
request_id,
|
||||
sync_request_id,
|
||||
error: RPCError::Disconnected,
|
||||
});
|
||||
}
|
||||
@@ -879,7 +879,7 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: RequestType::BlocksByRoot(request),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
|
||||
_ => None,
|
||||
})
|
||||
@@ -899,7 +899,7 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: RequestType::BlobsByRoot(request),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
} if request
|
||||
.blob_ids
|
||||
.to_vec()
|
||||
@@ -924,7 +924,7 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: RequestType::BlocksByRoot(request),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
|
||||
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
|
||||
_ => None,
|
||||
})
|
||||
@@ -946,7 +946,7 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: RequestType::BlobsByRoot(request),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
} if request
|
||||
.blob_ids
|
||||
.to_vec()
|
||||
@@ -974,7 +974,8 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: RequestType::DataColumnsByRoot(request),
|
||||
request_id: AppRequestId::Sync(id @ SyncRequestId::DataColumnsByRoot { .. }),
|
||||
app_request_id:
|
||||
AppRequestId::Sync(id @ SyncRequestId::DataColumnsByRoot { .. }),
|
||||
} if request
|
||||
.data_column_ids
|
||||
.to_vec()
|
||||
|
||||
@@ -223,7 +223,7 @@ impl TestRig {
|
||||
RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(
|
||||
OldBlocksByRangeRequestV2 { start_slot, .. },
|
||||
)),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
|
||||
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
|
||||
_ => None,
|
||||
})
|
||||
@@ -240,7 +240,7 @@ impl TestRig {
|
||||
RequestType::DataColumnsByRange(DataColumnsByRangeRequest {
|
||||
start_slot, ..
|
||||
}),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
|
||||
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
|
||||
_ => None,
|
||||
}) {
|
||||
@@ -256,7 +256,7 @@ impl TestRig {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot, .. }),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
|
||||
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
|
||||
_ => None,
|
||||
})
|
||||
@@ -283,7 +283,7 @@ impl TestRig {
|
||||
"Completing BlocksByRange request {blocks_req_id:?} with empty stream"
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcBlock {
|
||||
request_id: SyncRequestId::BlocksByRange(blocks_req_id),
|
||||
sync_request_id: SyncRequestId::BlocksByRange(blocks_req_id),
|
||||
peer_id: block_peer,
|
||||
beacon_block: None,
|
||||
seen_timestamp: D,
|
||||
@@ -297,7 +297,7 @@ impl TestRig {
|
||||
"Completing BlobsByRange request {id:?} with empty stream"
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcBlob {
|
||||
request_id: SyncRequestId::BlobsByRange(id),
|
||||
sync_request_id: SyncRequestId::BlobsByRange(id),
|
||||
peer_id,
|
||||
blob_sidecar: None,
|
||||
seen_timestamp: D,
|
||||
@@ -310,7 +310,7 @@ impl TestRig {
|
||||
"Completing DataColumnsByRange request {id:?} with empty stream"
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcDataColumn {
|
||||
request_id: SyncRequestId::DataColumnsByRange(id),
|
||||
sync_request_id: SyncRequestId::DataColumnsByRange(id),
|
||||
peer_id,
|
||||
data_column: None,
|
||||
seen_timestamp: D,
|
||||
|
||||
Reference in New Issue
Block a user