diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 85d7b2b7d5..fb05ef7552 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -452,7 +452,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(chain, attestation)?; + verify_propagation_slot_range(&chain.slot_clock, attestation)?; // Check the attestation's epoch matches its target. if attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) @@ -716,7 +716,7 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(chain, attestation)?; + verify_propagation_slot_range(&chain.slot_clock, attestation)?; // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one // aggregation bit set. @@ -1019,14 +1019,13 @@ fn verify_head_block_is_known( /// to the current slot of the `chain`. /// /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. -pub fn verify_propagation_slot_range( - chain: &BeaconChain, - attestation: &Attestation, +pub fn verify_propagation_slot_range( + slot_clock: &S, + attestation: &Attestation, ) -> Result<(), Error> { let attestation_slot = attestation.data.slot; - let latest_permissible_slot = chain - .slot_clock + let latest_permissible_slot = slot_clock .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)?; if attestation_slot > latest_permissible_slot { @@ -1037,11 +1036,10 @@ pub fn verify_propagation_slot_range( } // Taking advantage of saturating subtraction on `Slot`. - let earliest_permissible_slot = chain - .slot_clock + let earliest_permissible_slot = slot_clock .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)? - - T::EthSpec::slots_per_epoch(); + - E::slots_per_epoch(); if attestation_slot < earliest_permissible_slot { return Err(Error::PastSlot { attestation_slot, diff --git a/beacon_node/beacon_chain/src/sync_committee_verification.rs b/beacon_node/beacon_chain/src/sync_committee_verification.rs index 4bc5b439e1..fa7d4dcfed 100644 --- a/beacon_node/beacon_chain/src/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/src/sync_committee_verification.rs @@ -273,7 +273,7 @@ impl VerifiedSyncContribution { let subcommittee_index = contribution.subcommittee_index as usize; // Ensure sync committee contribution is within the MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance. - verify_propagation_slot_range(chain, contribution)?; + verify_propagation_slot_range(&chain.slot_clock, contribution)?; // Validate subcommittee index. if contribution.subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT { @@ -428,7 +428,7 @@ impl VerifiedSyncCommitteeMessage { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future sync committee messages for later processing. - verify_propagation_slot_range(chain, &sync_message)?; + verify_propagation_slot_range(&chain.slot_clock, &sync_message)?; // Ensure the `subnet_id` is valid for the given validator. let pubkey = chain @@ -516,14 +516,13 @@ impl VerifiedSyncCommitteeMessage { /// to the current slot of the `chain`. /// /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. -pub fn verify_propagation_slot_range( - chain: &BeaconChain, +pub fn verify_propagation_slot_range( + slot_clock: &S, sync_contribution: &U, ) -> Result<(), Error> { let message_slot = sync_contribution.get_slot(); - let latest_permissible_slot = chain - .slot_clock + let latest_permissible_slot = slot_clock .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)?; if message_slot > latest_permissible_slot { @@ -533,8 +532,7 @@ pub fn verify_propagation_slot_range( }); } - let earliest_permissible_slot = chain - .slot_clock + let earliest_permissible_slot = slot_clock .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)?; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index d18c96c0a7..1b7ef7aa9b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -2,9 +2,9 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::store::Error; use beacon_chain::{ - attestation_verification::{Error as AttnError, VerifiedAttestation}, + attestation_verification::{self, Error as AttnError, VerifiedAttestation}, observed_operations::ObservationOutcome, - sync_committee_verification::Error as SyncCommitteeError, + sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::get_block_delay_ms, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, ForkChoiceError, GossipVerifiedBlock, @@ -19,7 +19,7 @@ use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, - SubnetId, SyncCommitteeMessage, SyncSubnetId, + Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use super::{ @@ -100,12 +100,7 @@ enum FailedAtt { impl FailedAtt { pub fn beacon_block_root(&self) -> &Hash256 { - match self { - FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root, - FailedAtt::Aggregate { attestation, .. } => { - &attestation.message.aggregate.data.beacon_block_root - } - } + &self.attestation().data.beacon_block_root } pub fn kind(&self) -> &'static str { @@ -114,6 +109,13 @@ impl FailedAtt { FailedAtt::Aggregate { .. } => "aggregated", } } + + pub fn attestation(&self) -> &Attestation { + match self { + FailedAtt::Unaggregate { attestation, .. } => attestation, + FailedAtt::Aggregate { attestation, .. } => &attestation.message.aggregate, + } + } } /// Items required to verify a batch of unaggregated gossip attestations. @@ -410,6 +412,7 @@ impl Worker { }, reprocess_tx, error, + seen_timestamp, ); } } @@ -608,6 +611,7 @@ impl Worker { }, reprocess_tx, error, + seen_timestamp, ); } } @@ -1117,6 +1121,7 @@ impl Worker { subnet_id: SyncSubnetId, seen_timestamp: Duration, ) { + let message_slot = sync_signature.slot; let sync_signature = match self .chain .verify_sync_committee_message_for_gossip(sync_signature, subnet_id) @@ -1128,6 +1133,8 @@ impl Worker { message_id, "sync_signature", e, + message_slot, + seen_timestamp, ); return; } @@ -1177,6 +1184,7 @@ impl Worker { sync_contribution: SignedContributionAndProof, seen_timestamp: Duration, ) { + let contribution_slot = sync_contribution.message.contribution.slot; let sync_contribution = match self .chain .verify_sync_contribution_for_gossip(sync_contribution) @@ -1189,6 +1197,8 @@ impl Worker { message_id, "sync_contribution", e, + contribution_slot, + seen_timestamp, ); return; } @@ -1232,6 +1242,7 @@ impl Worker { failed_att: FailedAtt, reprocess_tx: Option>>, error: AttnError, + seen_timestamp: Duration, ) { let beacon_block_root = failed_att.beacon_block_root(); let attestation_type = failed_att.kind(); @@ -1239,8 +1250,7 @@ impl Worker { match &error { AttnError::FutureEpoch { .. } | AttnError::PastEpoch { .. } - | AttnError::FutureSlot { .. } - | AttnError::PastSlot { .. } => { + | AttnError::FutureSlot { .. } => { /* * These errors can be triggered by a mismatch between our slot and the peer. * @@ -1262,6 +1272,24 @@ impl Worker { // Do not propagate these messages. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + AttnError::PastSlot { .. } => { + // Produce a slot clock frozen at the time we received the message from the + // network. + let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp); + let hindsight_verification = + attestation_verification::verify_propagation_slot_range( + seen_clock, + failed_att.attestation(), + ); + + // Only penalize the peer if it would have been invalid at the moment we received + // it. + if hindsight_verification.is_err() { + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); + } + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { /* * These errors are caused by invalid signatures. @@ -1625,6 +1653,8 @@ impl Worker { message_id: MessageId, message_type: &str, error: SyncCommitteeError, + sync_committee_message_slot: Slot, + seen_timestamp: Duration, ) { metrics::register_sync_committee_error(&error); @@ -1650,10 +1680,7 @@ impl Worker { // Do not propagate these messages. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } - SyncCommitteeError::PastSlot { - message_slot, - earliest_permissible_slot, - } => { + SyncCommitteeError::PastSlot { .. } => { /* * This error can be triggered by a mismatch between our slot and the peer. * @@ -1667,12 +1694,34 @@ impl Worker { "type" => ?message_type, ); - // We tolerate messages that were just one slot late. - if *message_slot + 1 < *earliest_permissible_slot { + // Compute the slot when we received the message. + let received_slot = self + .chain + .slot_clock + .slot_of(seen_timestamp) + .unwrap_or_else(|| self.chain.slot_clock.genesis_slot()); + + // The message is "excessively" late if it was more than one slot late. + let excessively_late = received_slot > sync_committee_message_slot + 1; + + // This closure will lazily produce a slot clock frozen at the time we received the + // message from the network and return a bool indicating if the message was invalid + // at the time of receipt too. + let invalid_in_hindsight = || { + let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp); + let hindsight_verification = + sync_committee_verification::verify_propagation_slot_range( + seen_clock, + &sync_committee_message_slot, + ); + hindsight_verification.is_err() + }; + + // Penalize the peer if the message was more than one slot late + if excessively_late && invalid_in_hindsight() { self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); } - // Do not propagate these messages. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } SyncCommitteeError::EmptyAggregationBitfield => { diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index f50931c6f6..183f5c9313 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -112,4 +112,18 @@ pub trait SlotClock: Send + Sync + Sized + Clone { Duration::from_secs(duration_into_slot.as_secs() % seconds_per_slot) }) } + + /// Produces a *new* slot clock with the same configuration of `self`, except that clock is + /// "frozen" at the `freeze_at` time. + /// + /// This is useful for observing the slot clock at arbitrary fixed points in time. + fn freeze_at(&self, freeze_at: Duration) -> ManualSlotClock { + let slot_clock = ManualSlotClock::new( + self.genesis_slot(), + self.genesis_duration(), + self.slot_duration(), + ); + slot_clock.set_current_time(freeze_at); + slot_clock + } }