mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
It is unclear from the logs during rpc_tests whether the output comes from the sender or the receiver. ``` 2025-03-20T11:21:50.038868Z DEBUG rpc_tests: Sending message 2 2025-03-20T11:21:50.041129Z DEBUG rpc_tests: Sender received a response 2025-03-20T11:21:50.041242Z DEBUG rpc_tests: Chunk received 2025-03-20T11:21:51.040837Z DEBUG rpc_tests: Sending message 3 2025-03-20T11:21:51.043635Z DEBUG rpc_tests: Sender received a response 2025-03-20T11:21:51.043855Z DEBUG rpc_tests: Chunk received 2025-03-20T11:21:52.043427Z DEBUG rpc_tests: Sending message 4 2025-03-20T11:21:52.052831Z DEBUG rpc_tests: Sender received a response 2025-03-20T11:21:52.052953Z DEBUG rpc_tests: Chunk received 2025-03-20T11:21:53.045589Z DEBUG rpc_tests: Sending message 5 2025-03-20T11:21:53.052718Z DEBUG rpc_tests: Sender received a response 2025-03-20T11:21:53.052825Z DEBUG rpc_tests: Chunk received 2025-03-20T11:21:54.049157Z DEBUG rpc_tests: Sending message 6 2025-03-20T11:21:54.058072Z DEBUG rpc_tests: Sender received a response 2025-03-20T11:21:54.058603Z DEBUG rpc_tests: Chunk received 2025-03-20T11:21:55.018822Z DEBUG Swarm::poll: libp2p_gossipsub::behaviour: Starting heartbeat 2025-03-20T11:21:55.018953Z DEBUG Swarm::poll: libp2p_gossipsub::behaviour: Completed Heartbeat 2025-03-20T11:21:55.027100Z DEBUG Swarm::poll: libp2p_gossipsub::behaviour: Starting heartbeat 2025-03-20T11:21:55.027199Z DEBUG Swarm::poll: libp2p_gossipsub::behaviour: Completed Heartbeat ``` Added `info_span` to both the sender and receiver in each test. ``` 2025-03-20T11:20:04.172699Z DEBUG Receiver: rpc_tests: Sending message 2 2025-03-20T11:20:04.179147Z DEBUG Sender: rpc_tests: Sender received a response 2025-03-20T11:20:04.179281Z DEBUG Sender: rpc_tests: Chunk received 2025-03-20T11:20:05.175300Z DEBUG Receiver: rpc_tests: Sending message 3 2025-03-20T11:20:05.177202Z DEBUG Sender: rpc_tests: Sender received a response 2025-03-20T11:20:05.177292Z DEBUG Sender: rpc_tests: Chunk received 2025-03-20T11:20:06.176868Z DEBUG Receiver: rpc_tests: Sending message 4 2025-03-20T11:20:06.179379Z DEBUG Sender: rpc_tests: Sender received a response 2025-03-20T11:20:06.179460Z DEBUG Sender: rpc_tests: Chunk received 2025-03-20T11:20:07.179257Z DEBUG Receiver: rpc_tests: Sending message 5 2025-03-20T11:20:07.181386Z DEBUG Sender: rpc_tests: Sender received a response 2025-03-20T11:20:07.181503Z DEBUG Sender: rpc_tests: Chunk received 2025-03-20T11:20:08.181428Z DEBUG Receiver: rpc_tests: Sending message 6 2025-03-20T11:20:08.190231Z DEBUG Sender: rpc_tests: Sender received a response 2025-03-20T11:20:08.190358Z DEBUG Sender: rpc_tests: Chunk received 2025-03-20T11:20:09.151699Z DEBUG Sender:Swarm::poll: libp2p_gossipsub::behaviour: Starting heartbeat 2025-03-20T11:20:09.151748Z DEBUG Sender:Swarm::poll: libp2p_gossipsub::behaviour: Completed Heartbeat 2025-03-20T11:20:09.160244Z DEBUG Receiver:Swarm::poll: libp2p_gossipsub::behaviour: Starting heartbeat 2025-03-20T11:20:09.160288Z DEBUG Receiver:Swarm::poll: libp2p_gossipsub::behaviour: Completed Heartbeat ```
1416 lines
53 KiB
Rust
1416 lines
53 KiB
Rust
#![cfg(test)]
|
|
|
|
mod common;
|
|
|
|
use common::{build_tracing_subscriber, Protocol};
|
|
use lighthouse_network::rpc::{methods::*, RequestType};
|
|
use lighthouse_network::service::api_types::AppRequestId;
|
|
use lighthouse_network::{NetworkEvent, ReportSource, Response};
|
|
use ssz::Encode;
|
|
use ssz_types::VariableList;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
use tokio::runtime::Runtime;
|
|
use tokio::time::sleep;
|
|
use tracing::{debug, error, info_span, warn, Instrument};
|
|
use types::{
|
|
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
|
|
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec,
|
|
RuntimeVariableList, Signature, SignedBeaconBlock, Slot,
|
|
};
|
|
|
|
type E = MinimalEthSpec;
|
|
|
|
/// Bellatrix block with length < max_rpc_size.
|
|
fn bellatrix_block_small(spec: &ChainSpec) -> BeaconBlock<E> {
|
|
let mut block = BeaconBlockBellatrix::<E>::empty(spec);
|
|
let tx = VariableList::from(vec![0; 1024]);
|
|
let txs = VariableList::from(std::iter::repeat_n(tx, 5000).collect::<Vec<_>>());
|
|
|
|
block.body.execution_payload.execution_payload.transactions = txs;
|
|
|
|
let block = BeaconBlock::Bellatrix(block);
|
|
assert!(block.ssz_bytes_len() <= spec.max_payload_size as usize);
|
|
block
|
|
}
|
|
|
|
/// Bellatrix block with length > MAX_RPC_SIZE.
|
|
/// The max limit for a bellatrix block is in the order of ~16GiB which wouldn't fit in memory.
|
|
/// Hence, we generate a bellatrix block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer.
|
|
fn bellatrix_block_large(spec: &ChainSpec) -> BeaconBlock<E> {
|
|
let mut block = BeaconBlockBellatrix::<E>::empty(spec);
|
|
let tx = VariableList::from(vec![0; 1024]);
|
|
let txs = VariableList::from(std::iter::repeat_n(tx, 100000).collect::<Vec<_>>());
|
|
|
|
block.body.execution_payload.execution_payload.transactions = txs;
|
|
|
|
let block = BeaconBlock::Bellatrix(block);
|
|
assert!(block.ssz_bytes_len() > spec.max_payload_size as usize);
|
|
block
|
|
}
|
|
|
|
// Tests the STATUS RPC message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_tcp_status_rpc() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec,
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// Dummy STATUS RPC message
|
|
let rpc_request = RequestType::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::zero(),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::zero(),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// Dummy STATUS RPC message
|
|
let rpc_response = Response::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::zero(),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::zero(),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: AppRequestId::Router,
|
|
response,
|
|
} => {
|
|
// Should receive the RPC response
|
|
debug!("Sender Received");
|
|
assert_eq!(response, rpc_response.clone());
|
|
debug!("Sender Completed");
|
|
return;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
debug!("Receiver Received");
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
rpc_response.clone(),
|
|
);
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests a streamed BlocksByRange RPC Message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_tcp_blocks_by_range_chunked_rpc() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let messages_to_send = 6;
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Bellatrix,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRange Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
|
|
start_slot: 0,
|
|
count: messages_to_send,
|
|
step: 1,
|
|
}));
|
|
|
|
// BlocksByRange Response
|
|
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_base = Response::BlocksByRange(Some(Arc::new(signed_full_block)));
|
|
|
|
let full_block = BeaconBlock::Altair(BeaconBlockAltair::<E>::full(&spec));
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_altair = Response::BlocksByRange(Some(Arc::new(signed_full_block)));
|
|
|
|
let full_block = bellatrix_block_small(&spec);
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_bellatrix_small =
|
|
Response::BlocksByRange(Some(Arc::new(signed_full_block)));
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: _,
|
|
response,
|
|
} => {
|
|
warn!("Sender received a response");
|
|
match response {
|
|
Response::BlocksByRange(Some(_)) => {
|
|
if messages_received < 2 {
|
|
assert_eq!(response, rpc_response_base.clone());
|
|
} else if messages_received < 4 {
|
|
assert_eq!(response, rpc_response_altair.clone());
|
|
} else {
|
|
assert_eq!(response, rpc_response_bellatrix_small.clone());
|
|
}
|
|
messages_received += 1;
|
|
warn!("Chunk received");
|
|
}
|
|
Response::BlocksByRange(None) => {
|
|
// should be exactly `messages_to_send` messages before terminating
|
|
assert_eq!(messages_received, messages_to_send);
|
|
// end the test
|
|
return;
|
|
}
|
|
_ => panic!("Invalid RPC received"),
|
|
}
|
|
}
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
for i in 0..messages_to_send {
|
|
// Send first third of responses as base blocks,
|
|
// second as altair and third as bellatrix.
|
|
let rpc_response = if i < 2 {
|
|
rpc_response_base.clone()
|
|
} else if i < 4 {
|
|
rpc_response_altair.clone()
|
|
} else {
|
|
rpc_response_bellatrix_small.clone()
|
|
};
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
rpc_response.clone(),
|
|
);
|
|
}
|
|
// send the stream termination
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
Response::BlocksByRange(None),
|
|
);
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests a streamed BlobsByRange RPC Message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_blobs_by_range_chunked_rpc() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let slot_count = 32;
|
|
let messages_to_send = 34;
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let spec = Arc::new(E::default_spec());
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Deneb,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlobsByRange Request
|
|
let rpc_request = RequestType::BlobsByRange(BlobsByRangeRequest {
|
|
start_slot: 0,
|
|
count: slot_count,
|
|
});
|
|
|
|
// BlocksByRange Response
|
|
let blob = BlobSidecar::<E>::empty();
|
|
|
|
let rpc_response = Response::BlobsByRange(Some(Arc::new(blob)));
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: _,
|
|
response,
|
|
} => {
|
|
warn!("Sender received a response");
|
|
match response {
|
|
Response::BlobsByRange(Some(_)) => {
|
|
assert_eq!(response, rpc_response.clone());
|
|
messages_received += 1;
|
|
warn!("Chunk received");
|
|
}
|
|
Response::BlobsByRange(None) => {
|
|
// should be exactly `messages_to_send` messages before terminating
|
|
assert_eq!(messages_received, messages_to_send);
|
|
// end the test
|
|
return;
|
|
}
|
|
_ => panic!("Invalid RPC received"),
|
|
}
|
|
}
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
for _ in 0..messages_to_send {
|
|
// Send first third of responses as base blocks,
|
|
// second as altair and third as bellatrix.
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
rpc_response.clone(),
|
|
);
|
|
}
|
|
// send the stream termination
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
Response::BlobsByRange(None),
|
|
);
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests rejection of blocks over `MAX_RPC_SIZE`.
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_tcp_blocks_by_range_over_limit() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let messages_to_send = 5;
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Bellatrix,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRange Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRange(OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 {
|
|
start_slot: 0,
|
|
count: messages_to_send,
|
|
step: 1,
|
|
}));
|
|
|
|
// BlocksByRange Response
|
|
let full_block = bellatrix_block_large(&spec);
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_bellatrix_large =
|
|
Response::BlocksByRange(Some(Arc::new(signed_full_block)));
|
|
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
// The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE
|
|
NetworkEvent::RPCFailed { app_request_id, .. } => {
|
|
assert!(matches!(app_request_id, AppRequestId::Router));
|
|
return;
|
|
}
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
for _ in 0..messages_to_send {
|
|
let rpc_response = rpc_response_bellatrix_large.clone();
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
rpc_response.clone(),
|
|
);
|
|
}
|
|
// send the stream termination
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
Response::BlocksByRange(None),
|
|
);
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests that a streamed BlocksByRange RPC Message terminates when all expected chunks were received
|
|
#[test]
|
|
fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let messages_to_send = 10;
|
|
let extra_messages_to_send = 10;
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRange Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
|
|
start_slot: 0,
|
|
count: messages_to_send,
|
|
step: 1,
|
|
}));
|
|
|
|
// BlocksByRange Response
|
|
let empty_block = BeaconBlock::empty(&spec);
|
|
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
|
|
let rpc_response = Response::BlocksByRange(Some(Arc::new(empty_signed)));
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received: u64 = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: _,
|
|
response,
|
|
} =>
|
|
// Should receive the RPC response
|
|
{
|
|
debug!("Sender received a response");
|
|
match response {
|
|
Response::BlocksByRange(Some(_)) => {
|
|
assert_eq!(response, rpc_response.clone());
|
|
messages_received += 1;
|
|
}
|
|
Response::BlocksByRange(None) => {
|
|
// should be exactly 10 messages, as requested
|
|
assert_eq!(messages_received, messages_to_send);
|
|
}
|
|
_ => panic!("Invalid RPC received"),
|
|
}
|
|
}
|
|
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
|
|
// messages
|
|
let mut message_info = None;
|
|
// the number of messages we've sent
|
|
let mut messages_sent = 0;
|
|
let receiver_future = async {
|
|
loop {
|
|
// this future either drives the sending/receiving or times out allowing messages to be
|
|
// sent in the timeout
|
|
match futures::future::select(
|
|
Box::pin(receiver.next_event()),
|
|
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
|
)
|
|
.await
|
|
{
|
|
futures::future::Either::Left((
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
},
|
|
_,
|
|
)) => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
message_info = Some((peer_id, inbound_request_id));
|
|
}
|
|
}
|
|
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
|
_ => continue,
|
|
}
|
|
|
|
// if we need to send messages send them here. This will happen after a delay
|
|
if message_info.is_some() {
|
|
messages_sent += 1;
|
|
let (peer_id, inbound_request_id) = message_info.as_ref().unwrap();
|
|
receiver.send_response(*peer_id, *inbound_request_id, rpc_response.clone());
|
|
debug!("Sending message {}", messages_sent);
|
|
if messages_sent == messages_to_send + extra_messages_to_send {
|
|
// stop sending messages
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests an empty response to a BlocksByRange RPC Message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_tcp_blocks_by_range_single_empty_rpc() {
|
|
// Set up the logging.
|
|
let log_level = "trace";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRange Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
|
|
start_slot: 0,
|
|
count: 10,
|
|
step: 1,
|
|
}));
|
|
|
|
// BlocksByRange Response
|
|
let empty_block = BeaconBlock::empty(&spec);
|
|
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
|
|
let rpc_response = Response::BlocksByRange(Some(Arc::new(empty_signed)));
|
|
|
|
let messages_to_send = 1;
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: AppRequestId::Router,
|
|
response,
|
|
} => match response {
|
|
Response::BlocksByRange(Some(_)) => {
|
|
assert_eq!(response, rpc_response.clone());
|
|
messages_received += 1;
|
|
warn!("Chunk received");
|
|
}
|
|
Response::BlocksByRange(None) => {
|
|
// should be exactly 10 messages before terminating
|
|
assert_eq!(messages_received, messages_to_send);
|
|
// end the test
|
|
return;
|
|
}
|
|
_ => panic!("Invalid RPC received"),
|
|
},
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
|
|
for _ in 1..=messages_to_send {
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
rpc_response.clone(),
|
|
);
|
|
}
|
|
// send the stream termination
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
Response::BlocksByRange(None),
|
|
);
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(20)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests a streamed, chunked BlocksByRoot RPC Message
|
|
// The size of the response is a full `BeaconBlock`
|
|
// which is greater than the Snappy frame size. Hence, this test
|
|
// serves to test the snappy framing format as well.
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn test_tcp_blocks_by_root_chunked_rpc() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let messages_to_send = 6;
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
// get sender/receiver
|
|
rt.block_on(async {
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Bellatrix,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRoot Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
|
|
block_roots: RuntimeVariableList::from_vec(
|
|
vec![
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
],
|
|
spec.max_request_blocks_upper_bound(),
|
|
),
|
|
}));
|
|
|
|
// BlocksByRoot Response
|
|
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_base = Response::BlocksByRoot(Some(Arc::new(signed_full_block)));
|
|
|
|
let full_block = BeaconBlock::Altair(BeaconBlockAltair::<E>::full(&spec));
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_altair = Response::BlocksByRoot(Some(Arc::new(signed_full_block)));
|
|
|
|
let full_block = bellatrix_block_small(&spec);
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response_bellatrix_small =
|
|
Response::BlocksByRoot(Some(Arc::new(signed_full_block)));
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: AppRequestId::Router,
|
|
response,
|
|
} => match response {
|
|
Response::BlocksByRoot(Some(_)) => {
|
|
if messages_received < 2 {
|
|
assert_eq!(response, rpc_response_base.clone());
|
|
} else if messages_received < 4 {
|
|
assert_eq!(response, rpc_response_altair.clone());
|
|
} else {
|
|
assert_eq!(response, rpc_response_bellatrix_small.clone());
|
|
}
|
|
messages_received += 1;
|
|
debug!("Chunk received");
|
|
}
|
|
Response::BlocksByRoot(None) => {
|
|
// should be exactly messages_to_send
|
|
assert_eq!(messages_received, messages_to_send);
|
|
// end the test
|
|
return;
|
|
}
|
|
_ => {} // Ignore other RPC messages
|
|
},
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
match receiver.next_event().await {
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
debug!("Receiver got request");
|
|
|
|
for i in 0..messages_to_send {
|
|
// Send equal base, altair and bellatrix blocks
|
|
let rpc_response = if i < 2 {
|
|
rpc_response_base.clone()
|
|
} else if i < 4 {
|
|
rpc_response_altair.clone()
|
|
} else {
|
|
rpc_response_bellatrix_small.clone()
|
|
};
|
|
receiver.send_response(peer_id, inbound_request_id, rpc_response);
|
|
debug!("Sending message");
|
|
}
|
|
// send the stream termination
|
|
receiver.send_response(
|
|
peer_id,
|
|
inbound_request_id,
|
|
Response::BlocksByRange(None),
|
|
);
|
|
debug!("Send stream term");
|
|
}
|
|
}
|
|
_ => {} // Ignore other events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received
|
|
#[test]
|
|
fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
|
// Set up the logging.
|
|
let log_level = "debug";
|
|
let enable_logging = true;
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let messages_to_send: u64 = 10;
|
|
let extra_messages_to_send: u64 = 10;
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
// get sender/receiver
|
|
rt.block_on(async {
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec.clone(),
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// BlocksByRoot Request
|
|
let rpc_request =
|
|
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
|
|
block_roots: RuntimeVariableList::from_vec(
|
|
vec![
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
Hash256::zero(),
|
|
],
|
|
spec.max_request_blocks_upper_bound(),
|
|
),
|
|
}));
|
|
|
|
// BlocksByRoot Response
|
|
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
|
|
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
|
|
let rpc_response = Response::BlocksByRoot(Some(Arc::new(signed_full_block)));
|
|
|
|
// keep count of the number of messages received
|
|
let mut messages_received = 0;
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a STATUS message
|
|
debug!("Sending RPC");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id: _,
|
|
app_request_id: AppRequestId::Router,
|
|
response,
|
|
} => {
|
|
debug!("Sender received a response");
|
|
match response {
|
|
Response::BlocksByRoot(Some(_)) => {
|
|
assert_eq!(response, rpc_response.clone());
|
|
messages_received += 1;
|
|
debug!("Chunk received");
|
|
}
|
|
Response::BlocksByRoot(None) => {
|
|
// should be exactly messages_to_send
|
|
assert_eq!(messages_received, messages_to_send);
|
|
// end the test
|
|
return;
|
|
}
|
|
_ => {} // Ignore other RPC messages
|
|
}
|
|
}
|
|
_ => {} // Ignore other behaviour events
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
|
|
// messages
|
|
let mut message_info = None;
|
|
// the number of messages we've sent
|
|
let mut messages_sent = 0;
|
|
let receiver_future = async {
|
|
loop {
|
|
// this future either drives the sending/receiving or times out allowing messages to be
|
|
// sent in the timeout
|
|
match futures::future::select(
|
|
Box::pin(receiver.next_event()),
|
|
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
|
)
|
|
.await
|
|
{
|
|
futures::future::Either::Left((
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
},
|
|
_,
|
|
)) => {
|
|
if request_type == rpc_request {
|
|
// send the response
|
|
warn!("Receiver got request");
|
|
message_info = Some((peer_id, inbound_request_id));
|
|
}
|
|
}
|
|
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
|
|
_ => continue,
|
|
}
|
|
|
|
// if we need to send messages send them here. This will happen after a delay
|
|
if message_info.is_some() {
|
|
messages_sent += 1;
|
|
let (peer_id, inbound_request_id) = message_info.as_ref().unwrap();
|
|
receiver.send_response(*peer_id, *inbound_request_id, rpc_response.clone());
|
|
debug!("Sending message {}", messages_sent);
|
|
if messages_sent == messages_to_send + extra_messages_to_send {
|
|
// stop sending messages
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
|
|
/// Goodbye message.
|
|
fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) {
|
|
// Set up the logging.
|
|
build_tracing_subscriber(log_level, enable_logging);
|
|
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
// get sender/receiver
|
|
rt.block_on(async {
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec,
|
|
protocol,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// build the sender future
|
|
let sender_future = async {
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
// Send a goodbye and disconnect
|
|
debug!("Sending RPC");
|
|
sender.goodbye_peer(
|
|
&peer_id,
|
|
GoodbyeReason::IrrelevantNetwork,
|
|
ReportSource::SyncService,
|
|
);
|
|
}
|
|
NetworkEvent::PeerDisconnected(_) => {
|
|
return;
|
|
}
|
|
_ => {} // Ignore other RPC messages
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender"));
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
if let NetworkEvent::PeerDisconnected(_) = receiver.next_event().await {
|
|
// Should receive sent RPC request
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver"));
|
|
|
|
let total_future = futures::future::join(sender_future, receiver_future);
|
|
|
|
tokio::select! {
|
|
_ = total_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests a Goodbye RPC message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn tcp_test_goodbye_rpc() {
|
|
let log_level = "debug";
|
|
let enabled_logging = true;
|
|
goodbye_test(log_level, enabled_logging, Protocol::Tcp);
|
|
}
|
|
|
|
// Tests a Goodbye RPC message
|
|
#[test]
|
|
#[allow(clippy::single_match)]
|
|
fn quic_test_goodbye_rpc() {
|
|
let log_level = "debug";
|
|
let enabled_logging = true;
|
|
goodbye_test(log_level, enabled_logging, Protocol::Quic);
|
|
}
|
|
|
|
// Test that the receiver delays the responses during response rate-limiting.
|
|
#[test]
|
|
fn test_delayed_rpc_response() {
|
|
// Set up the logging.
|
|
build_tracing_subscriber("debug", true);
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
// Allow 1 token to be use used every 3 seconds.
|
|
const QUOTA_SEC: u64 = 3;
|
|
|
|
rt.block_on(async {
|
|
// get sender/receiver
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec,
|
|
Protocol::Tcp,
|
|
false,
|
|
// Configure a quota for STATUS responses of 1 token every 3 seconds.
|
|
Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()),
|
|
)
|
|
.await;
|
|
|
|
// Dummy STATUS RPC message
|
|
let rpc_request = RequestType::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::from_low_u64_be(0),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::from_low_u64_be(0),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// Dummy STATUS RPC message
|
|
let rpc_response = Response::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::from_low_u64_be(0),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::from_low_u64_be(0),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// build the sender future
|
|
let sender_future = async {
|
|
let mut request_id = 1;
|
|
let mut request_sent_at = Instant::now();
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
debug!(%request_id, "Sending RPC request");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
request_sent_at = Instant::now();
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id,
|
|
app_request_id: _,
|
|
response,
|
|
} => {
|
|
debug!(%request_id, elapsed = ?request_sent_at.elapsed(), "Sender received response");
|
|
assert_eq!(response, rpc_response);
|
|
|
|
match request_id {
|
|
1 => {
|
|
// The first response is returned instantly.
|
|
assert!(request_sent_at.elapsed() < Duration::from_millis(100));
|
|
}
|
|
2..=5 => {
|
|
// The second and subsequent responses are delayed due to the response rate-limiter on the receiver side.
|
|
// Adding a slight margin to the elapsed time check to account for potential timing issues caused by system
|
|
// scheduling or execution delays during testing.
|
|
assert!(
|
|
request_sent_at.elapsed()
|
|
> (Duration::from_secs(QUOTA_SEC)
|
|
- Duration::from_millis(100))
|
|
);
|
|
if request_id == 5 {
|
|
// End the test
|
|
return;
|
|
}
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
|
|
request_id += 1;
|
|
debug!(%request_id, "Sending RPC request");
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
request_sent_at = Instant::now();
|
|
}
|
|
NetworkEvent::RPCFailed {
|
|
app_request_id: _,
|
|
peer_id: _,
|
|
error,
|
|
} => {
|
|
error!(?error, "RPC Failed");
|
|
panic!("Rpc failed.");
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
};
|
|
|
|
// build the receiver future
|
|
let receiver_future = async {
|
|
loop {
|
|
if let NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} = receiver.next_event().await
|
|
{
|
|
assert_eq!(request_type, rpc_request);
|
|
debug!("Receiver received request");
|
|
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
|
|
}
|
|
}
|
|
};
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at
|
|
// once, thanks to the self-limiter on the sender side.
|
|
#[test]
|
|
fn test_active_requests() {
|
|
// Set up the logging.
|
|
build_tracing_subscriber("debug", true);
|
|
let rt = Arc::new(Runtime::new().unwrap());
|
|
let spec = Arc::new(E::default_spec());
|
|
|
|
rt.block_on(async {
|
|
// Get sender/receiver.
|
|
let (mut sender, mut receiver) = common::build_node_pair(
|
|
Arc::downgrade(&rt),
|
|
ForkName::Base,
|
|
spec,
|
|
Protocol::Tcp,
|
|
false,
|
|
None,
|
|
)
|
|
.await;
|
|
|
|
// Dummy STATUS RPC request.
|
|
let rpc_request = RequestType::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::from_low_u64_be(0),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::from_low_u64_be(0),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// Dummy STATUS RPC response.
|
|
let rpc_response = Response::Status(StatusMessage {
|
|
fork_digest: [0; 4],
|
|
finalized_root: Hash256::zero(),
|
|
finalized_epoch: Epoch::new(1),
|
|
head_root: Hash256::zero(),
|
|
head_slot: Slot::new(1),
|
|
});
|
|
|
|
// Number of requests.
|
|
const REQUESTS: u8 = 10;
|
|
|
|
// Build the sender future.
|
|
let sender_future = async {
|
|
let mut response_received = 0;
|
|
loop {
|
|
match sender.next_event().await {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
debug!("Sending RPC request");
|
|
// Send requests in quick succession to intentionally trigger request queueing in the self-limiter.
|
|
for _ in 0..REQUESTS {
|
|
sender
|
|
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
|
.unwrap();
|
|
}
|
|
}
|
|
NetworkEvent::ResponseReceived { response, .. } => {
|
|
debug!(?response, "Sender received response");
|
|
if matches!(response, Response::Status(_)) {
|
|
response_received += 1;
|
|
}
|
|
}
|
|
NetworkEvent::RPCFailed {
|
|
app_request_id: _,
|
|
peer_id: _,
|
|
error,
|
|
} => panic!("RPC failed: {:?}", error),
|
|
_ => {}
|
|
}
|
|
|
|
if response_received == REQUESTS {
|
|
return;
|
|
}
|
|
}
|
|
};
|
|
|
|
// Build the receiver future.
|
|
let receiver_future = async {
|
|
let mut received_requests = vec![];
|
|
loop {
|
|
tokio::select! {
|
|
event = receiver.next_event() => {
|
|
if let NetworkEvent::RequestReceived { peer_id, inbound_request_id, request_type } = event {
|
|
debug!(?request_type, "Receiver received request");
|
|
if matches!(request_type, RequestType::Status(_)) {
|
|
received_requests.push((peer_id, inbound_request_id));
|
|
}
|
|
}
|
|
}
|
|
// Introduce a delay in sending responses to trigger request queueing on the sender side.
|
|
_ = sleep(Duration::from_secs(3)) => {
|
|
for (peer_id, inbound_request_id) in received_requests.drain(..) {
|
|
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
tokio::select! {
|
|
_ = sender_future => {}
|
|
_ = receiver_future => {}
|
|
_ = sleep(Duration::from_secs(30)) => {
|
|
panic!("Future timed out");
|
|
}
|
|
}
|
|
})
|
|
}
|