mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Fix DataColumnsByRoot request limit validation bug (#7928)
Fixes #7926
This was a bug I introduced in #7890 and @pawanjay176 noticed it on some running nodes, and added a rpc test to confirm it.
The culprit is this line, where I failed to fill the vec to it's max size, so it doesn't calculate the max size properly, resulting in all `DataColumnByRoot` requests exceeding the max size during validation:
d24a6d2a45/consensus/types/src/chain_spec.rs (L1984)
The PR fixes this and includes new regression tests for this fix.
This commit is contained in:
@@ -15,9 +15,10 @@ use tokio::runtime::Runtime;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{Instrument, debug, error, info_span, warn};
|
||||
use types::{
|
||||
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
|
||||
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec,
|
||||
RuntimeVariableList, Signature, SignedBeaconBlock, Slot,
|
||||
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader,
|
||||
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EmptyBlock, Epoch,
|
||||
EthSpec, FixedBytesExtended, ForkName, Hash256, KzgCommitment, KzgProof, MinimalEthSpec,
|
||||
RuntimeVariableList, Signature, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
|
||||
};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
@@ -949,6 +950,307 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::single_match)]
|
||||
fn test_tcp_columns_by_root_chunked_rpc() {
|
||||
// Set up the logging.
|
||||
let log_level = "debug";
|
||||
let enable_logging = true;
|
||||
let _subscriber = build_tracing_subscriber(log_level, enable_logging);
|
||||
let num_of_columns = E::number_of_columns();
|
||||
let messages_to_send = 32 * num_of_columns;
|
||||
|
||||
let spec = Arc::new(spec_with_all_forks_enabled());
|
||||
let current_fork_name = ForkName::Fulu;
|
||||
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
// get sender/receiver
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
current_fork_name,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// DataColumnsByRootRequest Request
|
||||
|
||||
let max_request_blocks = spec.max_request_blocks(current_fork_name);
|
||||
let req = DataColumnsByRootRequest::new(
|
||||
vec![
|
||||
DataColumnsByRootIdentifier {
|
||||
block_root: Hash256::zero(),
|
||||
columns: VariableList::new(
|
||||
(0..E::number_of_columns() as u64).collect::<Vec<_>>()
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
max_request_blocks
|
||||
],
|
||||
max_request_blocks,
|
||||
);
|
||||
let req_bytes = req.data_column_ids.as_ssz_bytes();
|
||||
let req_decoded = DataColumnsByRootRequest {
|
||||
data_column_ids: <RuntimeVariableList<DataColumnsByRootIdentifier<E>>>::from_ssz_bytes(
|
||||
&req_bytes,
|
||||
spec.max_request_blocks(current_fork_name),
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
assert_eq!(req, req_decoded);
|
||||
let rpc_request = RequestType::DataColumnsByRoot(req);
|
||||
|
||||
// DataColumnsByRoot Response
|
||||
let data_column = Arc::new(DataColumnSidecar {
|
||||
index: 1,
|
||||
signed_block_header: SignedBeaconBlockHeader {
|
||||
message: BeaconBlockHeader {
|
||||
slot: 320u64.into(),
|
||||
proposer_index: 1,
|
||||
parent_root: Hash256::zero(),
|
||||
state_root: Hash256::zero(),
|
||||
body_root: Hash256::zero(),
|
||||
},
|
||||
signature: Signature::empty(),
|
||||
},
|
||||
column: vec![vec![0; E::bytes_per_blob()].into()].into(),
|
||||
kzg_commitments: vec![KzgCommitment::empty_for_testing()].into(),
|
||||
kzg_proofs: vec![KzgProof::empty()].into(),
|
||||
kzg_commitments_inclusion_proof: vec![
|
||||
Hash256::zero();
|
||||
E::kzg_commitments_inclusion_proof_depth()
|
||||
]
|
||||
.into(),
|
||||
});
|
||||
|
||||
let rpc_response = Response::DataColumnsByRoot(Some(data_column.clone()));
|
||||
|
||||
// keep count of the number of messages received
|
||||
let mut messages_received = 0;
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
tracing::info!("Sending RPC");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
sender
|
||||
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
||||
.unwrap();
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => match response {
|
||||
Response::DataColumnsByRoot(Some(sidecar)) => {
|
||||
assert_eq!(sidecar, data_column.clone());
|
||||
messages_received += 1;
|
||||
tracing::info!("Chunk received");
|
||||
}
|
||||
Response::DataColumnsByRoot(None) => {
|
||||
// should be exactly messages_to_send
|
||||
assert_eq!(messages_received, messages_to_send);
|
||||
// end the test
|
||||
return;
|
||||
}
|
||||
_ => {} // Ignore other RPC messages
|
||||
},
|
||||
_ => {} // Ignore other behaviour events
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Sender"));
|
||||
|
||||
// build the receiver future
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
tracing::info!("Receiver got request");
|
||||
|
||||
for _ in 0..messages_to_send {
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
tracing::info!("Sending message");
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::DataColumnsByRoot(None),
|
||||
);
|
||||
tracing::info!("Send stream term");
|
||||
}
|
||||
}
|
||||
e => {
|
||||
tracing::info!(?e, "Got event");
|
||||
} // Ignore other events
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Receiver"));
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(300)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::single_match)]
|
||||
fn test_tcp_columns_by_range_chunked_rpc() {
|
||||
// Set up the logging.
|
||||
let log_level = "debug";
|
||||
let enable_logging = true;
|
||||
let _subscriber = build_tracing_subscriber(log_level, enable_logging);
|
||||
|
||||
let messages_to_send = 32;
|
||||
|
||||
let spec = Arc::new(spec_with_all_forks_enabled());
|
||||
let current_fork_name = ForkName::Fulu;
|
||||
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
// get sender/receiver
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
current_fork_name,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// DataColumnsByRange Request
|
||||
let rpc_request = RequestType::DataColumnsByRange(DataColumnsByRangeRequest {
|
||||
start_slot: 320,
|
||||
count: 32,
|
||||
columns: (0..E::number_of_columns() as u64).collect(),
|
||||
});
|
||||
|
||||
// DataColumnsByRange Response
|
||||
let data_column = Arc::new(DataColumnSidecar {
|
||||
index: 1,
|
||||
signed_block_header: SignedBeaconBlockHeader {
|
||||
message: BeaconBlockHeader {
|
||||
slot: 320u64.into(),
|
||||
proposer_index: 1,
|
||||
parent_root: Hash256::zero(),
|
||||
state_root: Hash256::zero(),
|
||||
body_root: Hash256::zero(),
|
||||
},
|
||||
signature: Signature::empty(),
|
||||
},
|
||||
column: vec![vec![0; E::bytes_per_blob()].into()].into(),
|
||||
kzg_commitments: vec![KzgCommitment::empty_for_testing()].into(),
|
||||
kzg_proofs: vec![KzgProof::empty()].into(),
|
||||
kzg_commitments_inclusion_proof: vec![
|
||||
Hash256::zero();
|
||||
E::kzg_commitments_inclusion_proof_depth()
|
||||
]
|
||||
.into(),
|
||||
});
|
||||
|
||||
let rpc_response = Response::DataColumnsByRange(Some(data_column.clone()));
|
||||
|
||||
// keep count of the number of messages received
|
||||
let mut messages_received = 0;
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
tracing::info!("Sending RPC");
|
||||
sender
|
||||
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
||||
.unwrap();
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id: _,
|
||||
app_request_id: AppRequestId::Router,
|
||||
response,
|
||||
} => match response {
|
||||
Response::DataColumnsByRange(Some(sidecar)) => {
|
||||
assert_eq!(sidecar, data_column.clone());
|
||||
messages_received += 1;
|
||||
tracing::info!("Chunk received");
|
||||
}
|
||||
Response::DataColumnsByRange(None) => {
|
||||
// should be exactly messages_to_send
|
||||
assert_eq!(messages_received, messages_to_send);
|
||||
// end the test
|
||||
return;
|
||||
}
|
||||
_ => {} // Ignore other RPC messages
|
||||
},
|
||||
_ => {} // Ignore other behaviour events
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Sender"));
|
||||
|
||||
// build the receiver future
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
match receiver.next_event().await {
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} => {
|
||||
if request_type == rpc_request {
|
||||
// send the response
|
||||
tracing::info!("Receiver got request");
|
||||
|
||||
for _ in 0..messages_to_send {
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
rpc_response.clone(),
|
||||
);
|
||||
tracing::info!("Sending message");
|
||||
}
|
||||
// send the stream termination
|
||||
receiver.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
Response::DataColumnsByRange(None),
|
||||
);
|
||||
tracing::info!("Send stream term");
|
||||
}
|
||||
}
|
||||
_ => {} // Ignore other events
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Receiver"));
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(300)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received
|
||||
#[test]
|
||||
fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
|
||||
@@ -1981,7 +1981,7 @@ fn max_data_columns_by_root_request_common<E: EthSpec>(max_request_blocks: u64)
|
||||
|
||||
let empty_data_columns_by_root_id = DataColumnsByRootIdentifier {
|
||||
block_root: Hash256::zero(),
|
||||
columns: VariableList::from(vec![0]),
|
||||
columns: VariableList::from(vec![0; E::number_of_columns()]),
|
||||
};
|
||||
|
||||
RuntimeVariableList::<DataColumnsByRootIdentifier<E>>::from_vec(
|
||||
|
||||
Reference in New Issue
Block a user