diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index ad03b45db9..098d7efadb 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -15,9 +15,10 @@ use tokio::runtime::Runtime; use tokio::time::sleep; use tracing::{Instrument, debug, error, info_span, warn}; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec, - EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec, - RuntimeVariableList, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, + BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EmptyBlock, Epoch, + EthSpec, FixedBytesExtended, ForkName, Hash256, KzgCommitment, KzgProof, MinimalEthSpec, + RuntimeVariableList, Signature, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; type E = MinimalEthSpec; @@ -949,6 +950,307 @@ fn test_tcp_blocks_by_root_chunked_rpc() { }) } +#[test] +#[allow(clippy::single_match)] +fn test_tcp_columns_by_root_chunked_rpc() { + // Set up the logging. + let log_level = "debug"; + let enable_logging = true; + let _subscriber = build_tracing_subscriber(log_level, enable_logging); + let num_of_columns = E::number_of_columns(); + let messages_to_send = 32 * num_of_columns; + + let spec = Arc::new(spec_with_all_forks_enabled()); + let current_fork_name = ForkName::Fulu; + + let rt = Arc::new(Runtime::new().unwrap()); + // get sender/receiver + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + current_fork_name, + spec.clone(), + Protocol::Tcp, + false, + None, + ) + .await; + + // DataColumnsByRootRequest Request + + let max_request_blocks = spec.max_request_blocks(current_fork_name); + let req = DataColumnsByRootRequest::new( + vec![ + DataColumnsByRootIdentifier { + block_root: Hash256::zero(), + columns: VariableList::new( + (0..E::number_of_columns() as u64).collect::>() + ) + .unwrap(), + }; + max_request_blocks + ], + max_request_blocks, + ); + let req_bytes = req.data_column_ids.as_ssz_bytes(); + let req_decoded = DataColumnsByRootRequest { + data_column_ids: >>::from_ssz_bytes( + &req_bytes, + spec.max_request_blocks(current_fork_name), + ) + .unwrap(), + }; + assert_eq!(req, req_decoded); + let rpc_request = RequestType::DataColumnsByRoot(req); + + // DataColumnsByRoot Response + let data_column = Arc::new(DataColumnSidecar { + index: 1, + signed_block_header: SignedBeaconBlockHeader { + message: BeaconBlockHeader { + slot: 320u64.into(), + proposer_index: 1, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::zero(), + }, + signature: Signature::empty(), + }, + column: vec![vec![0; E::bytes_per_blob()].into()].into(), + kzg_commitments: vec![KzgCommitment::empty_for_testing()].into(), + kzg_proofs: vec![KzgProof::empty()].into(), + kzg_commitments_inclusion_proof: vec![ + Hash256::zero(); + E::kzg_commitments_inclusion_proof_depth() + ] + .into(), + }); + + let rpc_response = Response::DataColumnsByRoot(Some(data_column.clone())); + + // keep count of the number of messages received + let mut messages_received = 0; + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + tracing::info!("Sending RPC"); + tokio::time::sleep(Duration::from_secs(1)).await; + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + peer_id: _, + app_request_id: AppRequestId::Router, + response, + } => match response { + Response::DataColumnsByRoot(Some(sidecar)) => { + assert_eq!(sidecar, data_column.clone()); + messages_received += 1; + tracing::info!("Chunk received"); + } + Response::DataColumnsByRoot(None) => { + // should be exactly messages_to_send + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => {} // Ignore other RPC messages + }, + _ => {} // Ignore other behaviour events + } + } + } + .instrument(info_span!("Sender")); + + // build the receiver future + let receiver_future = async { + loop { + match receiver.next_event().await { + NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + } => { + if request_type == rpc_request { + // send the response + tracing::info!("Receiver got request"); + + for _ in 0..messages_to_send { + receiver.send_response( + peer_id, + inbound_request_id, + rpc_response.clone(), + ); + tracing::info!("Sending message"); + } + // send the stream termination + receiver.send_response( + peer_id, + inbound_request_id, + Response::DataColumnsByRoot(None), + ); + tracing::info!("Send stream term"); + } + } + e => { + tracing::info!(?e, "Got event"); + } // Ignore other events + } + } + } + .instrument(info_span!("Receiver")); + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(300)) => { + panic!("Future timed out"); + } + } + }) +} + +#[test] +#[allow(clippy::single_match)] +fn test_tcp_columns_by_range_chunked_rpc() { + // Set up the logging. + let log_level = "debug"; + let enable_logging = true; + let _subscriber = build_tracing_subscriber(log_level, enable_logging); + + let messages_to_send = 32; + + let spec = Arc::new(spec_with_all_forks_enabled()); + let current_fork_name = ForkName::Fulu; + + let rt = Arc::new(Runtime::new().unwrap()); + // get sender/receiver + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + current_fork_name, + spec.clone(), + Protocol::Tcp, + false, + None, + ) + .await; + + // DataColumnsByRange Request + let rpc_request = RequestType::DataColumnsByRange(DataColumnsByRangeRequest { + start_slot: 320, + count: 32, + columns: (0..E::number_of_columns() as u64).collect(), + }); + + // DataColumnsByRange Response + let data_column = Arc::new(DataColumnSidecar { + index: 1, + signed_block_header: SignedBeaconBlockHeader { + message: BeaconBlockHeader { + slot: 320u64.into(), + proposer_index: 1, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::zero(), + }, + signature: Signature::empty(), + }, + column: vec![vec![0; E::bytes_per_blob()].into()].into(), + kzg_commitments: vec![KzgCommitment::empty_for_testing()].into(), + kzg_proofs: vec![KzgProof::empty()].into(), + kzg_commitments_inclusion_proof: vec![ + Hash256::zero(); + E::kzg_commitments_inclusion_proof_depth() + ] + .into(), + }); + + let rpc_response = Response::DataColumnsByRange(Some(data_column.clone())); + + // keep count of the number of messages received + let mut messages_received = 0; + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + tracing::info!("Sending RPC"); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + peer_id: _, + app_request_id: AppRequestId::Router, + response, + } => match response { + Response::DataColumnsByRange(Some(sidecar)) => { + assert_eq!(sidecar, data_column.clone()); + messages_received += 1; + tracing::info!("Chunk received"); + } + Response::DataColumnsByRange(None) => { + // should be exactly messages_to_send + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => {} // Ignore other RPC messages + }, + _ => {} // Ignore other behaviour events + } + } + } + .instrument(info_span!("Sender")); + + // build the receiver future + let receiver_future = async { + loop { + match receiver.next_event().await { + NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + } => { + if request_type == rpc_request { + // send the response + tracing::info!("Receiver got request"); + + for _ in 0..messages_to_send { + receiver.send_response( + peer_id, + inbound_request_id, + rpc_response.clone(), + ); + tracing::info!("Sending message"); + } + // send the stream termination + receiver.send_response( + peer_id, + inbound_request_id, + Response::DataColumnsByRange(None), + ); + tracing::info!("Send stream term"); + } + } + _ => {} // Ignore other events + } + } + } + .instrument(info_span!("Receiver")); + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(300)) => { + panic!("Future timed out"); + } + } + }) +} + // Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received #[test] fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index f42159252a..a236d0084f 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1981,7 +1981,7 @@ fn max_data_columns_by_root_request_common(max_request_blocks: u64) let empty_data_columns_by_root_id = DataColumnsByRootIdentifier { block_root: Hash256::zero(), - columns: VariableList::from(vec![0]), + columns: VariableList::from(vec![0; E::number_of_columns()]), }; RuntimeVariableList::>::from_vec(