diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 1a14cd2a69..e565f85ad9 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -60,9 +60,9 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - SingleAttestation, Attestation, AttestationRef, BeaconCommittee, - BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, - IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, + Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, + CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, + SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d8da4443fd..9d1797eacf 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -81,13 +81,12 @@ use tokio_stream::{ StreamExt, }; use types::{ - SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation, - AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, CommitteeCache, - ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, - SyncContributionData, + fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, + AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, + ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, + SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, + SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 5fc261438d..8e8d1a3f2f 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -50,7 +50,7 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::{SingleAttestation, Attestation, EthSpec}; +use types::{Attestation, EthSpec, SingleAttestation}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 93607989cd..7ebb6566f4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -39,9 +39,9 @@ use tokio::time::Duration; use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; use types::{ - attestation::AttestationBase, SingleAttestation, AggregateSignature, BitList, - Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, - SelectionProof, SignedRoot, Slot, + attestation::AttestationBase, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash, + Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, SingleAttestation, + Slot, }; type E = MainnetEthSpec; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 96748ae17b..0c63b5d378 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -6,16 +6,15 @@ use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; -use types::SingleAttestation; use types::{ - Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, - ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, - SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, - SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, + Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, + BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, + SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, + SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SingleAttestation, + SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -194,32 +193,32 @@ impl PubsubMessage { ))) } GossipKind::Attestation(subnet_id) => { - let attestation = - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(&fork_name) => { - if fork_name.electra_enabled() { - Attestation::Electra( - AttestationElectra::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } else { - Attestation::Base( - AttestationBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(&fork_name) => { + if fork_name.electra_enabled() { + let single_attestation = + SingleAttestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::SingleAttestation(Box::new(( + *subnet_id, + single_attestation, + )))) + } else { + let attestation = Attestation::Base( + AttestationBase::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } - None => { - return Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )) - } - }; - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) + } + None => Err(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + )), + } } GossipKind::BeaconBlock => { let beacon_block = diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7826807e03..f89241b4ae 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -549,7 +549,23 @@ impl NetworkService { // the attestation, else we just just propagate the Attestation. let should_process = self.subnet_service.should_process_attestation( Subnet::Attestation(subnet_id), - attestation, + attestation.data(), + ); + self.send_to_router(RouterMessage::PubsubMessage( + id, + source, + message, + should_process, + )); + } + PubsubMessage::SingleAttestation(ref subnet_and_attestation) => { + let subnet_id = subnet_and_attestation.0; + let single_attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we should process + // the attestation, else we just just propagate the Attestation. + let should_process = self.subnet_service.should_process_attestation( + Subnet::Attestation(subnet_id), + &single_attestation.data, ); self.send_to_router(RouterMessage::PubsubMessage( id, diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index da1f220f04..33ae567eb3 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -17,7 +17,7 @@ use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDisco use slog::{debug, error, o, warn}; use slot_clock::SlotClock; use types::{ - Attestation, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + AttestationData, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, }; @@ -363,7 +363,7 @@ impl SubnetService { pub fn should_process_attestation( &self, subnet: Subnet, - attestation: &Attestation, + attestation_data: &AttestationData, ) -> bool { // Proposer-only mode does not need to process attestations if self.proposer_only { @@ -374,7 +374,7 @@ impl SubnetService { .map(|tracked_vals| { tracked_vals.contains_key(&ExactSubnet { subnet, - slot: attestation.data().slot, + slot: attestation_data.slot, }) }) .unwrap_or(true) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 2b10af6fd4..af8573a578 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -36,7 +36,6 @@ use std::future::Future; use std::path::PathBuf; use std::time::Duration; use store::fork_versioned_response::ExecutionOptimisticFinalizedForkVersionedResponse; -use types::SingleAttestation; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 803defb31a..ff2ba1af7f 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1160,6 +1160,11 @@ impl EventKind { "attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)), )?)), + "single_attestation" => Ok(EventKind::SingleAttestation( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("SingleAttestation: {:?}", e)) + })?, + )), "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 1e519034f4..161621174a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -116,7 +116,7 @@ pub use crate::aggregate_and_proof::{ }; pub use crate::attestation::{ Attestation, AttestationBase, AttestationElectra, AttestationRef, AttestationRefMut, - Error as AttestationError, SingleAttestation + Error as AttestationError, SingleAttestation, }; pub use crate::attestation_data::AttestationData; pub use crate::attestation_duty::AttestationDuty;