mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
Merge remote-tracking branch 'origin/release-v8.1' into unstable
This commit is contained in:
@@ -457,6 +457,9 @@ fn handle_error<T>(
|
|||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// All snappy errors from the snap crate bubble up as `Other` kind errors
|
||||||
|
// that imply invalid response
|
||||||
|
ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())),
|
||||||
_ => Err(RPCError::from(err)),
|
_ => Err(RPCError::from(err)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2317,4 +2320,43 @@ mod tests {
|
|||||||
RPCError::InvalidData(_)
|
RPCError::InvalidData(_)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test invalid snappy response.
|
||||||
|
#[test]
|
||||||
|
fn test_invalid_snappy_response() {
|
||||||
|
let spec = spec_with_all_forks_enabled();
|
||||||
|
let fork_ctx = Arc::new(fork_context(ForkName::latest(), &spec));
|
||||||
|
let max_packet_size = spec.max_payload_size as usize; // 10 MiB.
|
||||||
|
|
||||||
|
let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy);
|
||||||
|
|
||||||
|
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
protocol.clone(),
|
||||||
|
max_packet_size,
|
||||||
|
fork_ctx.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut payload = BytesMut::new();
|
||||||
|
payload.extend_from_slice(&[0u8]);
|
||||||
|
let deneb_epoch = spec.deneb_fork_epoch.unwrap();
|
||||||
|
payload.extend_from_slice(&fork_ctx.context_bytes(deneb_epoch));
|
||||||
|
|
||||||
|
// Claim the MAXIMUM allowed size (10 MiB)
|
||||||
|
let claimed_size = max_packet_size;
|
||||||
|
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||||
|
uvi_codec.encode(claimed_size, &mut payload).unwrap();
|
||||||
|
payload.extend_from_slice(&[0xBB; 16]); // Junk snappy.
|
||||||
|
|
||||||
|
let result = codec.decode(&mut payload);
|
||||||
|
|
||||||
|
assert!(result.is_err(), "Expected decode to fail");
|
||||||
|
|
||||||
|
// IoError = reached snappy decode (allocation happened).
|
||||||
|
let err = result.unwrap_err();
|
||||||
|
assert!(
|
||||||
|
matches!(err, RPCError::InvalidData(_)),
|
||||||
|
"Should return invalid data variant {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -977,7 +977,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// remove all skip slots i.e. duplicated roots
|
// remove all skip slots i.e. duplicated roots
|
||||||
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
|
Ok(block_roots
|
||||||
|
.into_iter()
|
||||||
|
.unique_by(|(root, _)| *root)
|
||||||
|
.collect::<Vec<_>>())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a `BlobsByRange` request from the peer.
|
/// Handle a `BlobsByRange` request from the peer.
|
||||||
|
|||||||
@@ -120,6 +120,39 @@ impl TestRig {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn new_with_skip_slots(chain_length: u64, skip_slots: &HashSet<u64>) -> Self {
|
||||||
|
let mut spec = test_spec::<E>();
|
||||||
|
spec.shard_committee_period = 2;
|
||||||
|
let spec = Arc::new(spec);
|
||||||
|
let beacon_processor_config = BeaconProcessorConfig::default();
|
||||||
|
let harness = BeaconChainHarness::builder(MainnetEthSpec)
|
||||||
|
.spec(spec.clone())
|
||||||
|
.deterministic_keypairs(VALIDATOR_COUNT)
|
||||||
|
.fresh_ephemeral_store()
|
||||||
|
.mock_execution_layer()
|
||||||
|
.node_custody_type(NodeCustodyType::Fullnode)
|
||||||
|
.chain_config(<_>::default())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
harness.advance_slot();
|
||||||
|
|
||||||
|
for slot in 1..=chain_length {
|
||||||
|
if !skip_slots.contains(&slot) {
|
||||||
|
harness
|
||||||
|
.extend_chain(
|
||||||
|
1,
|
||||||
|
BlockStrategy::OnCanonicalHead,
|
||||||
|
AttestationStrategy::AllValidators,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness.advance_slot();
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn new_parametric(
|
pub async fn new_parametric(
|
||||||
chain_length: u64,
|
chain_length: u64,
|
||||||
beacon_processor_config: BeaconProcessorConfig,
|
beacon_processor_config: BeaconProcessorConfig,
|
||||||
@@ -150,6 +183,14 @@ impl TestRig {
|
|||||||
harness.advance_slot();
|
harness.advance_slot();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn from_harness(
|
||||||
|
harness: BeaconChainHarness<T>,
|
||||||
|
beacon_processor_config: BeaconProcessorConfig,
|
||||||
|
spec: Arc<ChainSpec>,
|
||||||
|
) -> Self {
|
||||||
let head = harness.chain.head_snapshot();
|
let head = harness.chain.head_snapshot();
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -1986,3 +2027,78 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
|
|||||||
"Should have received at least some data columns"
|
"Should have received at least some data columns"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that DataColumnsByRange does not return duplicate data columns for skip slots.
|
||||||
|
///
|
||||||
|
/// When skip slots occur, `forwards_iter_block_roots` returns the same block root for
|
||||||
|
/// consecutive slots. The deduplication in `get_block_roots_from_store` must use
|
||||||
|
/// `unique_by` on the root (not the full `(root, slot)` tuple) to avoid serving
|
||||||
|
/// duplicate data columns for the same block.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
|
||||||
|
if test_spec::<E>().fulu_fork_epoch.is_none() {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
|
||||||
|
// After 4 epochs, finalized_epoch=2 (finalized_slot=64). Requesting slots 0-9
|
||||||
|
// satisfies req_start_slot + req_count <= finalized_slot (10 <= 64), which routes
|
||||||
|
// through `get_block_roots_from_store` — the code path with the bug.
|
||||||
|
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
|
||||||
|
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;
|
||||||
|
|
||||||
|
let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0)));
|
||||||
|
let requested_column = vec![all_custody_columns[0]];
|
||||||
|
|
||||||
|
// Request a range that spans the skip slots (slots 0 through 9).
|
||||||
|
let start_slot = 0;
|
||||||
|
let slot_count = 10;
|
||||||
|
|
||||||
|
rig.network_beacon_processor
|
||||||
|
.send_data_columns_by_range_request(
|
||||||
|
PeerId::random(),
|
||||||
|
InboundRequestId::new_unchecked(42, 24),
|
||||||
|
DataColumnsByRangeRequest {
|
||||||
|
start_slot,
|
||||||
|
count: slot_count,
|
||||||
|
columns: requested_column.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Collect block roots from all data column responses.
|
||||||
|
let mut block_roots: Vec<Hash256> = Vec::new();
|
||||||
|
|
||||||
|
while let Some(next) = rig.network_rx.recv().await {
|
||||||
|
if let NetworkMessage::SendResponse {
|
||||||
|
peer_id: _,
|
||||||
|
response: Response::DataColumnsByRange(data_column),
|
||||||
|
inbound_request_id: _,
|
||||||
|
} = next
|
||||||
|
{
|
||||||
|
if let Some(column) = data_column {
|
||||||
|
block_roots.push(column.block_root());
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic!("unexpected message {:?}", next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!block_roots.is_empty(),
|
||||||
|
"Should have received at least some data columns"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Before the fix, skip slots caused the same block root to appear multiple times
|
||||||
|
// (once per skip slot) because .unique() on (Hash256, Slot) tuples didn't deduplicate.
|
||||||
|
let unique_roots: HashSet<_> = block_roots.iter().collect();
|
||||||
|
assert_eq!(
|
||||||
|
block_roots.len(),
|
||||||
|
unique_roots.len(),
|
||||||
|
"Response contained duplicate block roots: got {} columns but only {} unique roots",
|
||||||
|
block_roots.len(),
|
||||||
|
unique_roots.len(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
@@ -77,8 +77,6 @@ const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
|
|||||||
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
|
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
|
||||||
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
|
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
// Generally the timeout for events should be longer than a slot.
|
|
||||||
const HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER: u32 = 50;
|
|
||||||
const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
|
|
||||||
/// A struct to define a variety of different timeouts for different validator tasks to ensure
|
/// A struct to define a variety of different timeouts for different validator tasks to ensure
|
||||||
@@ -99,7 +97,6 @@ pub struct Timeouts {
|
|||||||
pub get_debug_beacon_states: Duration,
|
pub get_debug_beacon_states: Duration,
|
||||||
pub get_deposit_snapshot: Duration,
|
pub get_deposit_snapshot: Duration,
|
||||||
pub get_validator_block: Duration,
|
pub get_validator_block: Duration,
|
||||||
pub events: Duration,
|
|
||||||
pub default: Duration,
|
pub default: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,7 +117,6 @@ impl Timeouts {
|
|||||||
get_debug_beacon_states: timeout,
|
get_debug_beacon_states: timeout,
|
||||||
get_deposit_snapshot: timeout,
|
get_deposit_snapshot: timeout,
|
||||||
get_validator_block: timeout,
|
get_validator_block: timeout,
|
||||||
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * timeout,
|
|
||||||
default: timeout,
|
default: timeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -143,7 +139,6 @@ impl Timeouts {
|
|||||||
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
||||||
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
|
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
|
||||||
get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
|
get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
|
||||||
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * base_timeout,
|
|
||||||
default: base_timeout / HTTP_DEFAULT_TIMEOUT_QUOTIENT,
|
default: base_timeout / HTTP_DEFAULT_TIMEOUT_QUOTIENT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3079,10 +3074,14 @@ impl BeaconNodeHttpClient {
|
|||||||
.join(",");
|
.join(",");
|
||||||
path.query_pairs_mut().append_pair("topics", &topic_string);
|
path.query_pairs_mut().append_pair("topics", &topic_string);
|
||||||
|
|
||||||
|
// Do not use a timeout for the events endpoint. Using a regular timeout will trigger a
|
||||||
|
// timeout every `timeout` seconds, regardless of any data streamed from the endpoint.
|
||||||
|
// In future we could add a read_timeout, but that can only be configured globally on the
|
||||||
|
// Client.
|
||||||
let mut es = self
|
let mut es = self
|
||||||
.client
|
.client
|
||||||
.get(path)
|
.get(path)
|
||||||
.timeout(self.timeouts.events)
|
.timeout(Duration::MAX)
|
||||||
.eventsource()
|
.eventsource()
|
||||||
.map_err(Error::SseEventSource)?;
|
.map_err(Error::SseEventSource)?;
|
||||||
// If we don't await `Event::Open` here, then the consumer
|
// If we don't await `Event::Open` here, then the consumer
|
||||||
|
|||||||
Reference in New Issue
Block a user