Adding light_client gossip topics (#3693)

## Issue Addressed
Implementing the light_client_gossip topics but I'm not there yet.

Which issue # does this PR address?
Partially #3651

## Proposed Changes
Add light client gossip topics.
Please list or describe the changes introduced by this PR.
I'm going to Implement light_client_finality_update and light_client_optimistic_update gossip topics. Currently I've attempted the former and I'm seeking feedback.

## Additional Info
I've only implemented the light_client_finality_update topic because I wanted to make sure I was on the correct path. Also checking that the gossiped LightClientFinalityUpdate is the same as the locally constructed one is not implemented because caching the updates will make this much easier. Could someone give me some feedback on this please? 

Please provide any additional information. For example, future considerations
or information useful for reviewers.

Co-authored-by: GeemoCandama <104614073+GeemoCandama@users.noreply.github.com>
This commit is contained in:
GeemoCandama
2022-12-13 06:24:51 +00:00
parent c973bfc90c
commit 1b28ef8a8d
20 changed files with 778 additions and 40 deletions

View File

@@ -62,9 +62,9 @@ use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof,
SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
@@ -129,6 +129,14 @@ const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
/// before we start dropping them.
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
/// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
@@ -195,6 +203,8 @@ pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing";
pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature";
pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution";
pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const RPC_BLOCK: &str = "rpc_block";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const STATUS_PROCESSING: &str = "status_processing";
@@ -476,6 +486,42 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new `Work` event for some light client finality update.
pub fn gossip_light_client_finality_update(
message_id: MessageId,
peer_id: PeerId,
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: true,
work: Work::GossipLightClientFinalityUpdate {
message_id,
peer_id,
light_client_finality_update,
seen_timestamp,
},
}
}
/// Create a new `Work` event for some light client optimistic update.
pub fn gossip_light_client_optimistic_update(
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: true,
work: Work::GossipLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
},
}
}
/// Create a new `Work` event for some attester slashing.
pub fn gossip_attester_slashing(
message_id: MessageId,
@@ -730,6 +776,18 @@ pub enum Work<T: BeaconChainTypes> {
sync_contribution: Box<SignedContributionAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipLightClientFinalityUpdate {
message_id: MessageId,
peer_id: PeerId,
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipLightClientOptimisticUpdate {
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
seen_timestamp: Duration,
},
RpcBlock {
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
@@ -777,6 +835,8 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipAttesterSlashing { .. } => GOSSIP_ATTESTER_SLASHING,
Work::GossipSyncSignature { .. } => GOSSIP_SYNC_SIGNATURE,
Work::GossipSyncContribution { .. } => GOSSIP_SYNC_CONTRIBUTION,
Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::Status { .. } => STATUS_PROCESSING,
@@ -916,6 +976,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut gossip_attester_slashing_queue =
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
// Using a FIFO queue for light client updates to maintain sequence order.
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
@@ -1250,6 +1314,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipSyncContribution { .. } => {
sync_contribution_queue.push(work)
}
Work::GossipLightClientFinalityUpdate { .. } => {
finality_update_queue.push(work, work_id, &self.log)
}
Work::GossipLightClientOptimisticUpdate { .. } => {
optimistic_update_queue.push(work, work_id, &self.log)
}
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
Work::ChainSegment { ref process_id, .. } => match process_id {
ChainSegmentProcessId::RangeBatchId { .. }
@@ -1551,7 +1621,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
)
}),
/*
* Syn contribution verification.
* Sync contribution verification.
*/
Work::GossipSyncContribution {
message_id,
@@ -1566,6 +1636,38 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
/*
* Light client finality update verification.
*/
Work::GossipLightClientFinalityUpdate {
message_id,
peer_id,
light_client_finality_update,
seen_timestamp,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_finality_update(
message_id,
peer_id,
*light_client_finality_update,
seen_timestamp,
)
}),
/*
* Light client optimistic update verification.
*/
Work::GossipLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_optimistic_update(
message_id,
peer_id,
*light_client_optimistic_update,
seen_timestamp,
)
}),
/*
* Verification for beacon blocks received during syncing via RPC.
*/

View File

@@ -3,6 +3,8 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::store::Error;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
observed_operations::ObservationOutcome,
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms,
@@ -18,9 +20,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage,
SyncSubnetId,
};
use super::{
@@ -1303,6 +1306,138 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_IMPORTED_TOTAL);
}
pub fn process_gossip_finality_update(
self,
message_id: MessageId,
peer_id: PeerId,
light_client_finality_update: LightClientFinalityUpdate<T::EthSpec>,
seen_timestamp: Duration,
) {
match self
.chain
.verify_finality_update_for_gossip(light_client_finality_update, seen_timestamp)
{
Ok(_verified_light_client_finality_update) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
}
Err(e) => {
metrics::register_finality_update_error(&e);
match e {
LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => {
debug!(
self.log,
"LC invalid finality update";
"peer" => %peer_id,
"error" => ?e,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"light_client_gossip_error",
);
}
LightClientFinalityUpdateError::TooEarly => {
debug!(
self.log,
"LC finality update too early";
"peer" => %peer_id,
"error" => ?e,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"light_client_gossip_error",
);
}
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
self.log,
"LC finality update already seen";
"peer" => %peer_id,
"error" => ?e,
),
LightClientFinalityUpdateError::BeaconChainError(_)
| LightClientFinalityUpdateError::LightClientUpdateError(_)
| LightClientFinalityUpdateError::SigSlotStartIsNone
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
self.log,
"LC error constructing finality update";
"peer" => %peer_id,
"error" => ?e,
),
}
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
};
}
pub fn process_gossip_optimistic_update(
self,
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
seen_timestamp: Duration,
) {
match self
.chain
.verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp)
{
Ok(_verified_light_client_optimistic_update) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
}
Err(e) => {
metrics::register_optimistic_update_error(&e);
match e {
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
debug!(
self.log,
"LC invalid optimistic update";
"peer" => %peer_id,
"error" => ?e,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"light_client_gossip_error",
)
}
LightClientOptimisticUpdateError::TooEarly => {
debug!(
self.log,
"LC optimistic update too early";
"peer" => %peer_id,
"error" => ?e,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"light_client_gossip_error",
);
}
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!(
self.log,
"LC optimistic update already seen";
"peer" => %peer_id,
"error" => ?e,
),
LightClientOptimisticUpdateError::BeaconChainError(_)
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
| LightClientOptimisticUpdateError::SigSlotStartIsNone
| LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!(
self.log,
"LC error constructing optimistic update";
"peer" => %peer_id,
"error" => ?e,
),
}
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
};
}
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
fn handle_attestation_verification_failure(

View File

@@ -1,5 +1,7 @@
use beacon_chain::{
attestation_verification::Error as AttnError,
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
sync_committee_verification::Error as SyncCommitteeError,
};
use fnv::FnvHashMap;
@@ -252,6 +254,19 @@ lazy_static! {
"Gossipsub sync_committee errors per error type",
&["type"]
);
pub static ref GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_light_client_finality_update_errors_per_type",
"Gossipsub light_client_finality_update errors per error type",
&["type"]
);
pub static ref GOSSIP_OPTIMISTIC_UPDATE_ERRORS_PER_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_light_client_optimistic_update_errors_per_type",
"Gossipsub light_client_optimistic_update errors per error type",
&["type"]
);
/*
* Network queue metrics
@@ -358,6 +373,14 @@ pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
);
}
pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]);
}
pub fn register_optimistic_update_error(error: &LightClientOptimisticUpdateError) {
inc_counter_vec(&GOSSIP_OPTIMISTIC_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]);
}
pub fn register_attestation_error(error: &AttnError) {
inc_counter_vec(&GOSSIP_ATTESTATION_ERRORS_PER_TYPE, &[error.as_ref()]);
}

View File

@@ -280,6 +280,30 @@ impl<T: BeaconChainTypes> Router<T> {
sync_committtee_msg.0,
);
}
PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => {
trace!(
self.log,
"Received light client finality update";
"peer_id" => %peer_id
);
self.processor.on_light_client_finality_update_gossip(
id,
peer_id,
light_client_finality_update,
);
}
PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => {
trace!(
self.log,
"Received light client optimistic update";
"peer_id" => %peer_id
);
self.processor.on_light_client_optimistic_update_gossip(
id,
peer_id,
light_client_optimistic_update,
);
}
}
}
}

View File

@@ -17,8 +17,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::SyncCommitteeMessage;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId,
Attestation, AttesterSlashing, EthSpec, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof,
SignedVoluntaryExit, SubnetId, SyncSubnetId,
};
/// Processes validated messages from the network. It relays necessary data to the syncing thread
@@ -368,6 +369,34 @@ impl<T: BeaconChainTypes> Processor<T> {
))
}
pub fn on_light_client_finality_update_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_finality_update(
message_id,
peer_id,
light_client_finality_update,
timestamp_now(),
))
}
pub fn on_light_client_optimistic_update_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_optimistic_update(
message_id,
peer_id,
light_client_optimistic_update,
timestamp_now(),
))
}
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
self.beacon_processor_send
.try_send(work)

View File

@@ -208,6 +208,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
metrics_update: tokio::time::Interval,
/// gossipsub_parameter_update timer
gossipsub_parameter_update: tokio::time::Interval,
/// enable_light_client_server indicator
enable_light_client_server: bool,
/// The logger for the network service.
fork_context: Arc<ForkContext>,
log: slog::Logger,
@@ -345,6 +347,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
gossipsub_parameter_update,
fork_context,
log: network_log,
enable_light_client_server: config.enable_light_client_server,
};
network_service.spawn_service(executor);
@@ -679,6 +682,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
return;
}
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
for fork_digest in self.required_gossip_fork_digests() {
@@ -695,6 +699,25 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
if self.enable_light_client_server {
for light_client_topic_kind in
lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter()
{
for fork_digest in self.required_gossip_fork_digests() {
let light_client_topic = GossipTopic::new(
light_client_topic_kind.clone(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.subscribe(light_client_topic.clone()) {
subscribed_topics.push(light_client_topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic);
}
}
}
}
// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {