Fix some single attestation network plumbing

This commit is contained in:
Eitan Seri-Levi
2025-01-07 19:53:00 +07:00
parent e95f00342b
commit 823ddf5e75
10 changed files with 72 additions and 54 deletions

View File

@@ -60,9 +60,9 @@ use std::borrow::Cow;
use strum::AsRefStr;
use tree_hash::TreeHash;
use types::{
SingleAttestation, Attestation, AttestationRef, BeaconCommittee,
BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256,
IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId,
Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec,
CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof,
SignedAggregateAndProof, SingleAttestation, Slot, SubnetId,
};
pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations};

View File

@@ -81,13 +81,12 @@ use tokio_stream::{
StreamExt,
};
use types::{
SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation,
AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, CommitteeCache,
ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit,
SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{

View File

@@ -50,7 +50,7 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::{SingleAttestation, Attestation, EthSpec};
use types::{Attestation, EthSpec, SingleAttestation};
// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
#[derive(Debug)]

View File

@@ -39,9 +39,9 @@ use tokio::time::Duration;
use tree_hash::TreeHash;
use types::application_domain::ApplicationDomain;
use types::{
attestation::AttestationBase, SingleAttestation, AggregateSignature, BitList,
Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, MainnetEthSpec, RelativeEpoch,
SelectionProof, SignedRoot, Slot,
attestation::AttestationBase, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash,
Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, SingleAttestation,
Slot,
};
type E = MainnetEthSpec;

View File

@@ -6,16 +6,15 @@ use snap::raw::{decompress_len, Decoder, Encoder};
use ssz::{Decode, Encode};
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use types::SingleAttestation;
use types::{
Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase,
AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec,
ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase,
SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra,
BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName,
LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing,
SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra,
SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SingleAttestation,
SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
#[derive(Debug, Clone, PartialEq)]
@@ -194,32 +193,32 @@ impl<E: EthSpec> PubsubMessage<E> {
)))
}
GossipKind::Attestation(subnet_id) => {
let attestation =
match fork_context.from_context_bytes(gossip_topic.fork_digest) {
Some(&fork_name) => {
if fork_name.electra_enabled() {
Attestation::Electra(
AttestationElectra::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
)
} else {
Attestation::Base(
AttestationBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
)
}
match fork_context.from_context_bytes(gossip_topic.fork_digest) {
Some(&fork_name) => {
if fork_name.electra_enabled() {
let single_attestation =
SingleAttestation::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::SingleAttestation(Box::new((
*subnet_id,
single_attestation,
))))
} else {
let attestation = Attestation::Base(
AttestationBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
);
Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))))
}
None => {
return Err(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
))
}
};
Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))))
}
None => Err(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
)),
}
}
GossipKind::BeaconBlock => {
let beacon_block =

View File

@@ -549,7 +549,23 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// the attestation, else we just just propagate the Attestation.
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
attestation,
attestation.data(),
);
self.send_to_router(RouterMessage::PubsubMessage(
id,
source,
message,
should_process,
));
}
PubsubMessage::SingleAttestation(ref subnet_and_attestation) => {
let subnet_id = subnet_and_attestation.0;
let single_attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
&single_attestation.data,
);
self.send_to_router(RouterMessage::PubsubMessage(
id,

View File

@@ -17,7 +17,7 @@ use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDisco
use slog::{debug, error, o, warn};
use slot_clock::SlotClock;
use types::{
Attestation, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
AttestationData, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
ValidatorSubscription,
};
@@ -363,7 +363,7 @@ impl<T: BeaconChainTypes> SubnetService<T> {
pub fn should_process_attestation(
&self,
subnet: Subnet,
attestation: &Attestation<T::EthSpec>,
attestation_data: &AttestationData,
) -> bool {
// Proposer-only mode does not need to process attestations
if self.proposer_only {
@@ -374,7 +374,7 @@ impl<T: BeaconChainTypes> SubnetService<T> {
.map(|tracked_vals| {
tracked_vals.contains_key(&ExactSubnet {
subnet,
slot: attestation.data().slot,
slot: attestation_data.slot,
})
})
.unwrap_or(true)