From 2bd5bbdffb87fde48f45b9e7e681780324e03b2b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sat, 8 Feb 2025 10:18:57 +1100 Subject: [PATCH] Optimise and refine `SingleAttestation` conversion (#6934) Closes - https://github.com/sigp/lighthouse/issues/6805 - Use a new `WorkEvent::GossipAttestationToConvert` to handle the conversion from `SingleAttestation` to `Attestation` _on_ the beacon processor (prevents a Tokio thread being blocked). - Improve the error handling for single attestations. I think previously we had no ability to reprocess single attestations for unknown blocks -- we would just error. This seemed to be the case in both gossip processing and processing of `SingleAttestation`s from the HTTP API. - Move the `SingleAttestation -> Attestation` conversion function into `beacon_chain` so that it can return the `attestation_verification::Error` type, which has well-defined error handling and peer penalties. The now-unused variants of `types::Attestation::Error` have been removed. --- .../src/attestation_verification.rs | 33 ++- beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain/src/single_attestation.rs | 46 ++++ beacon_node/beacon_chain/src/test_utils.rs | 4 +- beacon_node/beacon_processor/src/lib.rs | 45 +++- .../http_api/src/publish_attestations.rs | 51 ++-- .../gossip_methods.rs | 219 ++++++++++++++++-- .../src/network_beacon_processor/mod.rs | 81 +++---- consensus/types/src/attestation.rs | 42 +--- .../types/src/sync_committee_contribution.rs | 2 - 10 files changed, 379 insertions(+), 145 deletions(-) create mode 100644 beacon_node/beacon_chain/src/single_attestation.rs diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index a69eb99a51..a70a2caa4f 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -60,9 +60,9 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, - CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, - SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, + Attestation, AttestationData, AttestationRef, BeaconCommittee, + BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, + IndexedAttestation, SelectionProof, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; @@ -115,6 +115,17 @@ pub enum Error { /// /// The peer has sent an invalid message. AggregatorNotInCommittee { aggregator_index: u64 }, + /// The `attester_index` for a `SingleAttestation` is not a member of the committee defined + /// by its `beacon_block_root`, `committee_index` and `slot`. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + AttesterNotInCommittee { + attester_index: u64, + committee_index: u64, + slot: Slot, + }, /// The aggregator index refers to a validator index that we have not seen. /// /// ## Peer scoring @@ -485,7 +496,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; + verify_propagation_slot_range::<_, T::EthSpec>( + &chain.slot_clock, + attestation.data(), + &chain.spec, + )?; // Check the attestation's epoch matches its target. if attestation.data().slot.epoch(T::EthSpec::slots_per_epoch()) @@ -817,7 +832,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; + verify_propagation_slot_range::<_, T::EthSpec>( + &chain.slot_clock, + attestation.data(), + &chain.spec, + )?; // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one // aggregation bit set. @@ -1133,10 +1152,10 @@ fn verify_head_block_is_known( /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. pub fn verify_propagation_slot_range( slot_clock: &S, - attestation: AttestationRef, + attestation: &AttestationData, spec: &ChainSpec, ) -> Result<(), Error> { - let attestation_slot = attestation.data().slot; + let attestation_slot = attestation.slot; let latest_permissible_slot = slot_clock .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 456b3c0dd8..48168aeb02 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -54,6 +54,7 @@ mod pre_finalization_cache; pub mod proposer_prep_service; pub mod schema_change; pub mod shuffling_cache; +pub mod single_attestation; pub mod state_advance_timer; pub mod sync_committee_rewards; pub mod sync_committee_verification; diff --git a/beacon_node/beacon_chain/src/single_attestation.rs b/beacon_node/beacon_chain/src/single_attestation.rs new file mode 100644 index 0000000000..fa4f98bb07 --- /dev/null +++ b/beacon_node/beacon_chain/src/single_attestation.rs @@ -0,0 +1,46 @@ +use crate::attestation_verification::Error; +use types::{Attestation, AttestationElectra, BitList, BitVector, EthSpec, SingleAttestation}; + +pub fn single_attestation_to_attestation( + single_attestation: &SingleAttestation, + committee: &[usize], +) -> Result, Error> { + let attester_index = single_attestation.attester_index; + let committee_index = single_attestation.committee_index; + let slot = single_attestation.data.slot; + + let aggregation_bit = committee + .iter() + .enumerate() + .find_map(|(i, &validator_index)| { + if attester_index as usize == validator_index { + return Some(i); + } + None + }) + .ok_or(Error::AttesterNotInCommittee { + attester_index, + committee_index, + slot, + })?; + + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(committee_index as usize, true) + .map_err(|e| Error::Invalid(e.into()))?; + + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; + aggregation_bits + .set(aggregation_bit, true) + .map_err(|e| Error::Invalid(e.into()))?; + + // TODO(electra): consider eventually allowing conversion to non-Electra attestations as well + // to maintain invertability (`Attestation` -> `SingleAttestation` -> `Attestation`). + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: single_attestation.data.clone(), + signature: single_attestation.signature.clone(), + })) +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 4526b2b360..e61146bfc8 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -7,6 +7,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, + single_attestation::single_attestation_to_attestation, sync_committee_verification::Error as SyncCommitteeError, validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, @@ -1133,7 +1134,8 @@ where let single_attestation = attestation.to_single_attestation_with_attester_index(attester_index as u64)?; - let attestation: Attestation = single_attestation.to_attestation(committee.committee)?; + let attestation: Attestation = + single_attestation_to_attestation(&single_attestation, committee.committee).unwrap(); assert_eq!( single_attestation.committee_index, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 92f4636c95..2743f93bb3 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -62,9 +62,9 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use types::{ - Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId, + Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, + SingleAttestation, Slot, SubnetId, }; -use types::{EthSpec, Slot}; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, @@ -504,10 +504,10 @@ impl From for WorkEvent { /// Items required to verify a batch of unaggregated gossip attestations. #[derive(Debug)] -pub struct GossipAttestationPackage { +pub struct GossipAttestationPackage { pub message_id: MessageId, pub peer_id: PeerId, - pub attestation: Box>, + pub attestation: Box, pub subnet_id: SubnetId, pub should_import: bool, pub seen_timestamp: Duration, @@ -549,21 +549,32 @@ pub enum BlockingOrAsync { Blocking(BlockingFn), Async(AsyncFn), } +pub type GossipAttestationBatch = Vec>>; /// Indicates the type of work to be performed and therefore its priority and /// queuing specifics. pub enum Work { GossipAttestation { - attestation: Box>, - process_individual: Box) + Send + Sync>, - process_batch: Box>) + Send + Sync>, + attestation: Box>>, + process_individual: Box>) + Send + Sync>, + process_batch: Box) + Send + Sync>, + }, + // Attestation requiring conversion before processing. + // + // For now this is a `SingleAttestation`, but eventually we will switch this around so that + // legacy `Attestation`s are converted and the main processing pipeline operates on + // `SingleAttestation`s. + GossipAttestationToConvert { + attestation: Box>, + process_individual: + Box) + Send + Sync>, }, UnknownBlockAttestation { process_fn: BlockingFn, }, GossipAttestationBatch { - attestations: Vec>, - process_batch: Box>) + Send + Sync>, + attestations: GossipAttestationBatch, + process_batch: Box) + Send + Sync>, }, GossipAggregate { aggregate: Box>, @@ -639,6 +650,7 @@ impl fmt::Debug for Work { #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, + GossipAttestationToConvert, UnknownBlockAttestation, GossipAttestationBatch, GossipAggregate, @@ -690,6 +702,7 @@ impl Work { fn to_type(&self) -> WorkType { match self { Work::GossipAttestation { .. } => WorkType::GossipAttestation, + Work::GossipAttestationToConvert { .. } => WorkType::GossipAttestationToConvert, Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, Work::GossipAggregate { .. } => WorkType::GossipAggregate, Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, @@ -849,6 +862,7 @@ impl BeaconProcessor { let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); let mut aggregate_debounce = TimeLatch::default(); let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); + let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); let mut attestation_debounce = TimeLatch::default(); let mut unknown_block_aggregate_queue = LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); @@ -1180,6 +1194,9 @@ impl BeaconProcessor { None } } + // Convert any gossip attestations that need to be converted. + } else if let Some(item) = attestation_to_convert_queue.pop() { + Some(item) // Check sync committee messages after attestations as their rewards are lesser // and they don't influence fork choice. } else if let Some(item) = sync_contribution_queue.pop() { @@ -1301,6 +1318,9 @@ impl BeaconProcessor { match work { _ if can_spawn => self.spawn_worker(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), + Work::GossipAttestationToConvert { .. } => { + attestation_to_convert_queue.push(work) + } // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( @@ -1431,6 +1451,7 @@ impl BeaconProcessor { if let Some(modified_queue_id) = modified_queue_id { let queue_len = match modified_queue_id { WorkType::GossipAttestation => attestation_queue.len(), + WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), WorkType::GossipAttestationBatch => 0, // No queue WorkType::GossipAggregate => aggregate_queue.len(), @@ -1563,6 +1584,12 @@ impl BeaconProcessor { } => task_spawner.spawn_blocking(move || { process_individual(*attestation); }), + Work::GossipAttestationToConvert { + attestation, + process_individual, + } => task_spawner.spawn_blocking(move || { + process_individual(*attestation); + }), Work::GossipAttestationBatch { attestations, process_batch, diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 1b9949d4d5..10d13e09a5 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -36,8 +36,8 @@ //! attestations and there's no immediate cause for concern. use crate::task_spawner::{Priority, TaskSpawner}; use beacon_chain::{ - validator_monitor::timestamp_now, AttestationError, BeaconChain, BeaconChainError, - BeaconChainTypes, + single_attestation::single_attestation_to_attestation, validator_monitor::timestamp_now, + AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; use either::Either; @@ -183,10 +183,10 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>( chain: &Arc>, attestation: &'a Either, SingleAttestation>, ) -> Result>, Error> { - let a = match attestation { - Either::Left(a) => Cow::Borrowed(a), - Either::Right(single_attestation) => chain - .with_committee_cache( + match attestation { + Either::Left(a) => Ok(Cow::Borrowed(a)), + Either::Right(single_attestation) => { + let conversion_result = chain.with_committee_cache( single_attestation.data.target.root, single_attestation .data @@ -197,24 +197,33 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>( single_attestation.data.slot, single_attestation.committee_index, ) else { - return Err(BeaconChainError::AttestationError( - types::AttestationError::NoCommitteeForSlotAndIndex { - slot: single_attestation.data.slot, - index: single_attestation.committee_index, - }, - )); + return Ok(Err(AttestationError::NoCommitteeForSlotAndIndex { + slot: single_attestation.data.slot, + index: single_attestation.committee_index, + })); }; - let attestation = - single_attestation.to_attestation::(committee.committee)?; - - Ok(Cow::Owned(attestation)) + Ok(single_attestation_to_attestation::( + single_attestation, + committee.committee, + ) + .map(Cow::Owned)) }, - ) - .map_err(Error::FailedConversion)?, - }; - - Ok(a) + ); + match conversion_result { + Ok(Ok(attestation)) => Ok(attestation), + Ok(Err(e)) => Err(Error::Validation(e)), + // Map the error returned by `with_committee_cache` for unknown blocks into the + // `UnknownHeadBlock` error that is gracefully handled. + Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { + Err(Error::Validation(AttestationError::UnknownHeadBlock { + beacon_block_root, + })) + } + Err(e) => Err(Error::FailedConversion(e)), + } + } + } } pub async fn publish_attestations( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index dc8d32800e..090b963cbc 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -14,6 +14,7 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, + single_attestation::single_attestation_to_attestation, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -32,12 +33,12 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ - beacon_block::BlockImportSource, Attestation, AttestationRef, AttesterSlashing, BlobSidecar, - DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef, + AttesterSlashing, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, + IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, - SyncSubnetId, + SignedContributionAndProof, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + SyncCommitteeMessage, SyncSubnetId, }; use beacon_processor::{ @@ -45,7 +46,7 @@ use beacon_processor::{ QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, ReprocessQueueMessage, }, - DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, + DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, }; /// Set to `true` to introduce stricter penalties for peers who send some types of late consensus @@ -127,6 +128,11 @@ enum FailedAtt { should_import: bool, seen_timestamp: Duration, }, + // This variant is just a dummy variant for now, as SingleAttestation reprocessing is handled + // separately. + SingleUnaggregate { + attestation: Box, + }, Aggregate { attestation: Box>, seen_timestamp: Duration, @@ -135,20 +141,22 @@ enum FailedAtt { impl FailedAtt { pub fn beacon_block_root(&self) -> &Hash256 { - &self.attestation().data().beacon_block_root + &self.attestation_data().beacon_block_root } pub fn kind(&self) -> &'static str { match self { FailedAtt::Unaggregate { .. } => "unaggregated", + FailedAtt::SingleUnaggregate { .. } => "unaggregated", FailedAtt::Aggregate { .. } => "aggregated", } } - pub fn attestation(&self) -> AttestationRef { + pub fn attestation_data(&self) -> &AttestationData { match self { - FailedAtt::Unaggregate { attestation, .. } => attestation.to_ref(), - FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate(), + FailedAtt::Unaggregate { attestation, .. } => attestation.data(), + FailedAtt::SingleUnaggregate { attestation, .. } => &attestation.data, + FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate().data(), } } } @@ -229,7 +237,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, - packages: Vec>, + packages: GossipAttestationBatch, reprocess_tx: Option>, ) { let attestations_and_subnets = packages @@ -399,6 +407,155 @@ impl NetworkBeaconProcessor { } } + /// Process an unaggregated attestation requiring conversion. + /// + /// This function performs the conversion, and if successfull queues a new message to be + /// processed by `process_gossip_attestation`. If unsuccessful due to block unavailability, + /// a retry message will be pushed to the `reprocess_tx` if it is `Some`. + #[allow(clippy::too_many_arguments)] + pub fn process_gossip_attestation_to_convert( + self: Arc, + message_id: MessageId, + peer_id: PeerId, + single_attestation: Box, + subnet_id: SubnetId, + should_import: bool, + reprocess_tx: Option>, + seen_timestamp: Duration, + ) { + let conversion_result = self.chain.with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let slot = single_attestation.data.slot; + let committee_index = single_attestation.committee_index; + let Some(committee) = committee_cache.get_beacon_committee(slot, committee_index) + else { + return Ok(Err(AttnError::NoCommitteeForSlotAndIndex { + slot, + index: committee_index, + })); + }; + + Ok(single_attestation_to_attestation( + &single_attestation, + committee.committee, + )) + }, + ); + + match conversion_result { + Ok(Ok(attestation)) => { + let slot = attestation.data().slot; + if let Err(e) = self.send_unaggregated_attestation( + message_id.clone(), + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + ) { + error!( + &self.log, + "Unable to queue converted SingleAttestation"; + "error" => %e, + "slot" => slot, + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + } + // Outermost error (from `with_committee_cache`) indicating that the block is not known + // and that this conversion should be retried. + Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { + if let Some(sender) = reprocess_tx { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + beacon_block_root, + )) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + let processor = self.clone(); + // Do not allow this attestation to be re-processed beyond this point. + let reprocess_msg = + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_attestation_to_convert( + message_id, + peer_id, + single_attestation, + subnet_id, + should_import, + None, + seen_timestamp, + ) + }), + }); + if sender.try_send(reprocess_msg).is_err() { + error!( + self.log, + "Failed to send attestation for re-processing"; + ) + } + } else { + // We shouldn't make any further attempts to process this attestation. + // + // Don't downscore the peer since it's not clear if we requested this head + // block from them or not. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + } + Ok(Err(error)) => { + // We already handled reprocessing above so do not attempt it in the error handler. + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::SingleUnaggregate { + attestation: single_attestation, + }, + None, + error, + seen_timestamp, + ); + } + Err(error) => { + // We already handled reprocessing above so do not attempt it in the error handler. + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::SingleUnaggregate { + attestation: single_attestation, + }, + None, + AttnError::BeaconChainError(error), + seen_timestamp, + ); + } + } + } + /// Process the aggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -2207,9 +2364,9 @@ impl NetworkBeaconProcessor { // network. let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp); let hindsight_verification = - attestation_verification::verify_propagation_slot_range( + attestation_verification::verify_propagation_slot_range::<_, T::EthSpec>( seen_clock, - failed_att.attestation(), + failed_att.attestation_data(), &self.chain.spec, ); @@ -2294,6 +2451,19 @@ impl NetworkBeaconProcessor { "attn_agg_not_in_committee", ); } + AttnError::AttesterNotInCommittee { .. } => { + /* + * `SingleAttestation` from a validator is invalid because the `attester_index` is + * not in the claimed committee. There is no reason a non-faulty validator would + * send this message. + */ + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "attn_single_not_in_committee", + ); + } AttnError::AttestationSupersetKnown { .. } => { /* * The aggregate attestation has already been observed on the network or in @@ -2439,6 +2609,17 @@ impl NetworkBeaconProcessor { }), }) } + FailedAtt::SingleUnaggregate { .. } => { + // This should never happen, as we handle the unknown head block case + // for `SingleAttestation`s separately and should not be able to hit + // an `UnknownHeadBlock` error. + error!( + self.log, + "Dropping SingleAttestation instead of requeueing"; + "block_root" => ?beacon_block_root, + ); + return; + } FailedAtt::Unaggregate { attestation, subnet_id, @@ -2661,7 +2842,7 @@ impl NetworkBeaconProcessor { self.log, "Ignored attestation to finalized block"; "block_root" => ?beacon_block_root, - "attestation_slot" => failed_att.attestation().data().slot, + "attestation_slot" => failed_att.attestation_data().slot, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); @@ -2684,9 +2865,9 @@ impl NetworkBeaconProcessor { debug!( self.log, "Dropping attestation"; - "target_root" => ?failed_att.attestation().data().target.root, + "target_root" => ?failed_att.attestation_data().target.root, "beacon_block_root" => ?beacon_block_root, - "slot" => ?failed_att.attestation().data().slot, + "slot" => ?failed_att.attestation_data().slot, "type" => ?attestation_type, "error" => ?e, "peer_id" => % peer_id @@ -2705,7 +2886,7 @@ impl NetworkBeaconProcessor { self.log, "Unable to validate attestation"; "beacon_block_root" => ?beacon_block_root, - "slot" => ?failed_att.attestation().data().slot, + "slot" => ?failed_att.attestation_data().slot, "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, @@ -3106,9 +3287,9 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, ) { - let is_timely = attestation_verification::verify_propagation_slot_range( + let is_timely = attestation_verification::verify_propagation_slot_range::<_, T::EthSpec>( &self.chain.slot_clock, - attestation, + attestation.data(), &self.chain.spec, ) .is_ok(); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 5c1d4f24e5..c06a1f6ee3 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -94,46 +94,34 @@ impl NetworkBeaconProcessor { should_import: bool, seen_timestamp: Duration, ) -> Result<(), Error> { - let result = self.chain.with_committee_cache( - single_attestation.data.target.root, - single_attestation - .data - .slot - .epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { - let Some(committee) = committee_cache.get_beacon_committee( - single_attestation.data.slot, - single_attestation.committee_index, - ) else { - warn!( - self.log, - "No beacon committee for slot and index"; - "slot" => single_attestation.data.slot, - "index" => single_attestation.committee_index - ); - return Ok(Ok(())); - }; + let processor = self.clone(); + let process_individual = move |package: GossipAttestationPackage| { + let reprocess_tx = processor.reprocess_tx.clone(); + processor.process_gossip_attestation_to_convert( + package.message_id, + package.peer_id, + package.attestation, + package.subnet_id, + package.should_import, + Some(reprocess_tx), + package.seen_timestamp, + ) + }; - let attestation = single_attestation.to_attestation(committee.committee)?; - - Ok(self.send_unaggregated_attestation( - message_id.clone(), + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work: Work::GossipAttestationToConvert { + attestation: Box::new(GossipAttestationPackage { + message_id, peer_id, - attestation, + attestation: Box::new(single_attestation), subnet_id, should_import, seen_timestamp, - )) + }), + process_individual: Box::new(process_individual), }, - ); - - match result { - Ok(result) => result, - Err(e) => { - warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e); - Ok(()) - } - } + }) } /// Create a new `Work` event for some unaggregated attestation. @@ -148,18 +136,19 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { // Define a closure for processing individual attestations. let processor = self.clone(); - let process_individual = move |package: GossipAttestationPackage| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation( - package.message_id, - package.peer_id, - package.attestation, - package.subnet_id, - package.should_import, - Some(reprocess_tx), - package.seen_timestamp, - ) - }; + let process_individual = + move |package: GossipAttestationPackage>| { + let reprocess_tx = processor.reprocess_tx.clone(); + processor.process_gossip_attestation( + package.message_id, + package.peer_id, + package.attestation, + package.subnet_id, + package.should_import, + Some(reprocess_tx), + package.seen_timestamp, + ) + }; // Define a closure for processing batches of attestations. let processor = self.clone(); diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 276b27b0f8..1485842edb 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -2,7 +2,6 @@ use crate::slot_data::SlotData; use crate::{test_utils::TestRandom, Hash256, Slot}; use crate::{Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; -use safe_arith::ArithError; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::BitVector; @@ -12,22 +11,17 @@ use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; use super::{ - AggregateSignature, AttestationData, BitList, ChainSpec, CommitteeIndex, Domain, EthSpec, Fork, - SecretKey, Signature, SignedRoot, + AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, + Signature, SignedRoot, }; #[derive(Debug, PartialEq)] pub enum Error { SszTypesError(ssz_types::Error), AlreadySigned(usize), - SubnetCountIsZero(ArithError), IncorrectStateVariant, InvalidCommitteeLength, InvalidCommitteeIndex, - AttesterNotInCommittee(u64), - InvalidCommittee, - MissingCommittee, - NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex }, } impl From for Error { @@ -587,38 +581,6 @@ pub struct SingleAttestation { pub signature: AggregateSignature, } -impl SingleAttestation { - pub fn to_attestation(&self, committee: &[usize]) -> Result, Error> { - let aggregation_bit = committee - .iter() - .enumerate() - .find_map(|(i, &validator_index)| { - if self.attester_index as usize == validator_index { - return Some(i); - } - None - }) - .ok_or(Error::AttesterNotInCommittee(self.attester_index))?; - - let mut committee_bits: BitVector = BitVector::default(); - committee_bits - .set(self.committee_index as usize, true) - .map_err(|_| Error::InvalidCommitteeIndex)?; - - let mut aggregation_bits = - BitList::with_capacity(committee.len()).map_err(|_| Error::InvalidCommitteeLength)?; - - aggregation_bits.set(aggregation_bit, true)?; - - Ok(Attestation::Electra(AttestationElectra { - aggregation_bits, - committee_bits, - data: self.data.clone(), - signature: self.signature.clone(), - })) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/sync_committee_contribution.rs b/consensus/types/src/sync_committee_contribution.rs index c348c3e8be..9bae770fe5 100644 --- a/consensus/types/src/sync_committee_contribution.rs +++ b/consensus/types/src/sync_committee_contribution.rs @@ -1,7 +1,6 @@ use super::{AggregateSignature, EthSpec, SignedRoot}; use crate::slot_data::SlotData; use crate::{test_utils::TestRandom, BitVector, Hash256, Slot, SyncCommitteeMessage}; -use safe_arith::ArithError; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; @@ -11,7 +10,6 @@ use tree_hash_derive::TreeHash; pub enum Error { SszTypesError(ssz_types::Error), AlreadySigned(usize), - SubnetCountIsZero(ArithError), } /// An aggregation of `SyncCommitteeMessage`s, used in creating a `SignedContributionAndProof`.