mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-17 03:42:46 +00:00
Merge stable futures
This commit is contained in:
@@ -8,7 +8,7 @@ pub mod processor;
|
||||
|
||||
use crate::error;
|
||||
use crate::service::NetworkMessage;
|
||||
use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes, BlockError};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
|
||||
use eth2_libp2p::{
|
||||
rpc::{
|
||||
RPCCodedResponse, RPCError, RPCRequest, RPCResponse, RPCResponseErrorCode, RequestId,
|
||||
@@ -252,30 +252,28 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
match gossip_message {
|
||||
// Attestations should never reach the router.
|
||||
PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => {
|
||||
if self
|
||||
.processor
|
||||
.should_forward_aggregate_attestation(&aggregate_and_proof)
|
||||
if let Some(gossip_verified) =
|
||||
self.processor.verify_aggregated_attestation_for_gossip(
|
||||
peer_id.clone(),
|
||||
*aggregate_and_proof.clone(),
|
||||
)
|
||||
{
|
||||
self.propagate_message(id, peer_id.clone());
|
||||
self.processor
|
||||
.import_aggregated_attestation(peer_id, gossip_verified);
|
||||
}
|
||||
self.processor.process_attestation_gossip(
|
||||
peer_id,
|
||||
aggregate_and_proof.message.aggregate,
|
||||
AttestationType::Aggregated,
|
||||
);
|
||||
}
|
||||
PubsubMessage::Attestation(subnet_attestation) => {
|
||||
if self
|
||||
.processor
|
||||
.should_forward_attestation(&subnet_attestation.1)
|
||||
if let Some(gossip_verified) =
|
||||
self.processor.verify_unaggregated_attestation_for_gossip(
|
||||
peer_id.clone(),
|
||||
subnet_attestation.1.clone(),
|
||||
)
|
||||
{
|
||||
self.propagate_message(id, peer_id.clone());
|
||||
self.processor
|
||||
.import_unaggregated_attestation(peer_id, gossip_verified);
|
||||
}
|
||||
self.processor.process_attestation_gossip(
|
||||
peer_id,
|
||||
subnet_attestation.1,
|
||||
AttestationType::Unaggregated { should_store: true },
|
||||
);
|
||||
}
|
||||
PubsubMessage::BeaconBlock(block) => {
|
||||
match self.processor.should_forward_block(&peer_id, block) {
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::sync::{PeerSyncInfo, SyncMessage};
|
||||
use beacon_chain::{
|
||||
AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, BlockError,
|
||||
BlockProcessingOutcome, GossipVerifiedBlock,
|
||||
attestation_verification::{
|
||||
Error as AttnError, IntoForkChoiceVerifiedAttestation, VerifiedAggregatedAttestation,
|
||||
VerifiedUnaggregatedAttestation,
|
||||
},
|
||||
BeaconChain, BeaconChainTypes, BlockError, BlockProcessingOutcome, GossipVerifiedBlock,
|
||||
};
|
||||
use eth2_libp2p::rpc::methods::*;
|
||||
use eth2_libp2p::rpc::{RPCCodedResponse, RPCEvent, RPCRequest, RPCResponse, RequestId};
|
||||
@@ -548,77 +551,322 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
true
|
||||
}
|
||||
|
||||
/// Verifies the Aggregate attestation before propagating.
|
||||
pub fn should_forward_aggregate_attestation(
|
||||
&self,
|
||||
_aggregate_and_proof: &Box<SignedAggregateAndProof<T::EthSpec>>,
|
||||
) -> bool {
|
||||
// TODO: Implement
|
||||
true
|
||||
}
|
||||
|
||||
/// Verifies the attestation before propagating.
|
||||
pub fn should_forward_attestation(&self, _aggregate: &Attestation<T::EthSpec>) -> bool {
|
||||
// TODO: Implement
|
||||
true
|
||||
}
|
||||
|
||||
/// Process a new attestation received from gossipsub.
|
||||
pub fn process_attestation_gossip(
|
||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||
/// network.
|
||||
pub fn handle_attestation_verification_failure(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
msg: Attestation<T::EthSpec>,
|
||||
attestation_type: AttestationType,
|
||||
beacon_block_root: Hash256,
|
||||
attestation_type: &str,
|
||||
error: AttnError,
|
||||
) {
|
||||
match self
|
||||
.chain
|
||||
.process_attestation(msg.clone(), attestation_type)
|
||||
{
|
||||
Ok(outcome) => match outcome {
|
||||
AttestationProcessingOutcome::Processed => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Processed attestation";
|
||||
"source" => "gossip",
|
||||
"peer" => format!("{:?}",peer_id),
|
||||
"block_root" => format!("{}", msg.data.beacon_block_root),
|
||||
"slot" => format!("{}", msg.data.slot),
|
||||
);
|
||||
}
|
||||
AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => {
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
debug!(
|
||||
self.log,
|
||||
"Invalid attestation from network";
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"peer_id" => format!("{:?}", peer_id),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
|
||||
match error {
|
||||
AttnError::FutureEpoch { .. }
|
||||
| AttnError::PastEpoch { .. }
|
||||
| AttnError::FutureSlot { .. }
|
||||
| AttnError::PastSlot { .. } => {
|
||||
/*
|
||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||
*
|
||||
*
|
||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||
*/
|
||||
}
|
||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||
/*
|
||||
* These errors are caused by invalid signatures.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::EmptyAggregationBitfield => {
|
||||
/*
|
||||
* The aggregate had no signatures and is therefore worthless.
|
||||
*
|
||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||
* violation of the spec nor indication of fault.
|
||||
*
|
||||
* This may change soon. Reference:
|
||||
*
|
||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorNotInCommittee { .. } => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* There has already been an aggregate attestation seen from this
|
||||
* aggregator index.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
}
|
||||
AttnError::PriorAttestationKnown { .. } => {
|
||||
/*
|
||||
* We have already seen an attestation from this validator for this epoch.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
}
|
||||
AttnError::ValidatorIndexTooHigh(_) => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||
// just old. See:
|
||||
//
|
||||
// https://github.com/sigp/lighthouse/issues/1039
|
||||
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => format!("{:?}", peer_id),
|
||||
"block" => format!("{}", beacon_block_root)
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root));
|
||||
}
|
||||
AttestationProcessingOutcome::FutureEpoch { .. }
|
||||
| AttestationProcessingOutcome::PastEpoch { .. }
|
||||
| AttestationProcessingOutcome::UnknownTargetRoot { .. }
|
||||
| AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation
|
||||
AttestationProcessingOutcome::Invalid { .. }
|
||||
| AttestationProcessingOutcome::EmptyAggregationBitfield { .. }
|
||||
| AttestationProcessingOutcome::AttestsToFutureBlock { .. }
|
||||
| AttestationProcessingOutcome::InvalidSignature
|
||||
| AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. }
|
||||
| AttestationProcessingOutcome::BadTargetEpoch { .. } => {
|
||||
// the peer has sent a bad attestation. Remove them.
|
||||
self.network.disconnect(peer_id, GoodbyeReason::Fault);
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
// error is logged during the processing therefore no error is logged here
|
||||
trace!(
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root));
|
||||
}
|
||||
AttnError::UnknownTargetRoot(_) => {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
* 2. We do not have the target root in our DB.
|
||||
*
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BadTargetEpoch => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||
/*
|
||||
* It is not possible to attest this the given committee in the given slot.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||
/*
|
||||
* The unaggregated attestation doesn't have only one signature.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestsToFutureBlock { .. } => {
|
||||
/*
|
||||
* The beacon_block_root is from a higher slot than the attestation.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::Invalid(_) => {
|
||||
/*
|
||||
* The attestation failed the state_processing verification.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BeaconChainError(e) => {
|
||||
/*
|
||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||
* so we have a bug.
|
||||
*
|
||||
* It's not clear if the message is invalid/malicious.
|
||||
*/
|
||||
error!(
|
||||
self.log,
|
||||
"Erroneous gossip attestation ssz";
|
||||
"ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())),
|
||||
"Unable to validate aggregate";
|
||||
"peer_id" => format!("{:?}", peer_id),
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn verify_aggregated_attestation_for_gossip(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
aggregate_and_proof: SignedAggregateAndProof<T::EthSpec>,
|
||||
) -> Option<VerifiedAggregatedAttestation<T>> {
|
||||
// This is provided to the error handling function to assist with debugging.
|
||||
let beacon_block_root = aggregate_and_proof.message.aggregate.data.beacon_block_root;
|
||||
|
||||
self.chain
|
||||
.verify_aggregated_attestation_for_gossip(aggregate_and_proof)
|
||||
.map_err(|e| {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"aggregated",
|
||||
e,
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
pub fn import_aggregated_attestation(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
verified_attestation: VerifiedAggregatedAttestation<T>,
|
||||
) {
|
||||
// This is provided to the error handling function to assist with debugging.
|
||||
let beacon_block_root = verified_attestation.attestation().data.beacon_block_root;
|
||||
|
||||
self.apply_attestation_to_fork_choice(
|
||||
peer_id.clone(),
|
||||
beacon_block_root,
|
||||
&verified_attestation,
|
||||
);
|
||||
|
||||
if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for op pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn verify_unaggregated_attestation_for_gossip(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
unaggregated_attestation: Attestation<T::EthSpec>,
|
||||
) -> Option<VerifiedUnaggregatedAttestation<T>> {
|
||||
// This is provided to the error handling function to assist with debugging.
|
||||
let beacon_block_root = unaggregated_attestation.data.beacon_block_root;
|
||||
|
||||
self.chain
|
||||
.verify_unaggregated_attestation_for_gossip(unaggregated_attestation)
|
||||
.map_err(|e| {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"unaggregated",
|
||||
e,
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
pub fn import_unaggregated_attestation(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
verified_attestation: VerifiedUnaggregatedAttestation<T>,
|
||||
) {
|
||||
// This is provided to the error handling function to assist with debugging.
|
||||
let beacon_block_root = verified_attestation.attestation().data.beacon_block_root;
|
||||
|
||||
self.apply_attestation_to_fork_choice(
|
||||
peer_id.clone(),
|
||||
beacon_block_root,
|
||||
&verified_attestation,
|
||||
);
|
||||
|
||||
if let Err(e) = self
|
||||
.chain
|
||||
.add_to_naive_aggregation_pool(verified_attestation)
|
||||
{
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for agg pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply the attestation to fork choice, suppressing errors.
|
||||
///
|
||||
/// We suppress the errors when adding an attestation to fork choice since the spec
|
||||
/// permits gossiping attestations that are invalid to be applied to fork choice.
|
||||
///
|
||||
/// An attestation that is invalid for fork choice can still be included in a block.
|
||||
///
|
||||
/// Reference:
|
||||
/// https://github.com/ethereum/eth2.0-specs/issues/1408#issuecomment-617599260
|
||||
fn apply_attestation_to_fork_choice<'a>(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
beacon_block_root: Hash256,
|
||||
attestation: &'a impl IntoForkChoiceVerifiedAttestation<'a, T>,
|
||||
) {
|
||||
if let Err(e) = self.chain.apply_attestation_to_fork_choice(attestation) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,9 +15,9 @@ use rest_types::ValidatorSubscription;
|
||||
use slog::{debug, error, info, trace};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::timer::Delay;
|
||||
use tokio::time::{delay_for, Delay};
|
||||
use types::EthSpec;
|
||||
|
||||
mod tests;
|
||||
@@ -56,7 +56,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
pub fn start(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
config: &NetworkConfig,
|
||||
executor: &TaskExecutor,
|
||||
runtime_handle: &Handle,
|
||||
network_log: slog::Logger,
|
||||
) -> error::Result<(
|
||||
Arc<NetworkGlobals<T::EthSpec>>,
|
||||
@@ -95,7 +95,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
beacon_chain.clone(),
|
||||
network_globals.clone(),
|
||||
network_send.clone(),
|
||||
executor,
|
||||
runtime_handle,
|
||||
network_log.clone(),
|
||||
)?;
|
||||
|
||||
@@ -118,7 +118,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
propagation_percentage,
|
||||
};
|
||||
|
||||
let network_exit = spawn_service(network_service, &executor)?;
|
||||
let network_exit = runtime_handle.enter(|| spawn_service(network_service))?;
|
||||
|
||||
Ok((network_globals, network_send, network_exit))
|
||||
}
|
||||
@@ -126,48 +126,45 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
|
||||
fn spawn_service<T: BeaconChainTypes>(
|
||||
mut service: NetworkService<T>,
|
||||
executor: &TaskExecutor,
|
||||
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
|
||||
let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
// spawn on the current executor
|
||||
executor.spawn(
|
||||
futures::future::poll_fn(move || -> Result<_, ()> {
|
||||
|
||||
tokio::spawn(futures::future::poll_fn(move || -> Result<_, ()> {
|
||||
let log = &service.log;
|
||||
|
||||
// handles any logic which requires an initial delay
|
||||
if !service.initial_delay.is_elapsed() {
|
||||
if let Ok(Async::Ready(_)) = service.initial_delay.poll() {
|
||||
let multi_addrs = Swarm::listeners(&service.libp2p.swarm).cloned().collect();
|
||||
*service.network_globals.listen_multiaddrs.write() = multi_addrs;
|
||||
let multi_addrs = Swarm::listeners(&service.libp2p.swarm).cloned().collect();
|
||||
*service.network_globals.listen_multiaddrs.write() = multi_addrs;
|
||||
}
|
||||
}
|
||||
|
||||
// perform termination tasks when the network is being shutdown
|
||||
if let Ok(Async::Ready(_)) | Err(_) = exit_rx.poll() {
|
||||
// network thread is terminating
|
||||
let enrs: Vec<Enr> = service.libp2p.swarm.enr_entries().cloned().collect();
|
||||
debug!(
|
||||
log,
|
||||
"Persisting DHT to store";
|
||||
"Number of peers" => format!("{}", enrs.len()),
|
||||
);
|
||||
// network thread is terminating
|
||||
let enrs: Vec<Enr> = service.libp2p.swarm.enr_entries().cloned().collect();
|
||||
debug!(
|
||||
log,
|
||||
"Persisting DHT to store";
|
||||
"Number of peers" => format!("{}", enrs.len()),
|
||||
);
|
||||
|
||||
match persist_dht::<T::Store, T::EthSpec>(service.store.clone(), enrs) {
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Failed to persist DHT on drop";
|
||||
"error" => format!("{:?}", e)
|
||||
),
|
||||
Ok(_) => info!(
|
||||
log,
|
||||
"Saved DHT state";
|
||||
),
|
||||
}
|
||||
match persist_dht::<T::Store, T::EthSpec>(service.store.clone(), enrs) {
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Failed to persist DHT on drop";
|
||||
"error" => format!("{:?}", e)
|
||||
),
|
||||
Ok(_) => info!(
|
||||
log,
|
||||
"Saved DHT state";
|
||||
),
|
||||
}
|
||||
|
||||
info!(log.clone(), "Network service shutdown");
|
||||
return Ok(Async::Ready(()));
|
||||
info!(log.clone(), "Network service shutdown");
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
|
||||
// processes the network channel before processing the libp2p swarm
|
||||
@@ -201,7 +198,8 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
"propagation_peer" => format!("{:?}", propagation_source),
|
||||
"message_id" => message_id.to_string(),
|
||||
);
|
||||
service.libp2p
|
||||
service
|
||||
.libp2p
|
||||
.swarm
|
||||
.propagate_message(&propagation_source, message_id);
|
||||
}
|
||||
@@ -223,10 +221,10 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
} else {
|
||||
let mut topic_kinds = Vec::new();
|
||||
for message in &messages {
|
||||
if !topic_kinds.contains(&message.kind()) {
|
||||
topic_kinds.push(message.kind());
|
||||
}
|
||||
if !topic_kinds.contains(&message.kind()) {
|
||||
topic_kinds.push(message.kind());
|
||||
}
|
||||
}
|
||||
debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", topic_kinds));
|
||||
service.libp2p.swarm.publish(messages);
|
||||
}
|
||||
@@ -237,10 +235,11 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
std::time::Duration::from_secs(BAN_PEER_TIMEOUT),
|
||||
);
|
||||
}
|
||||
NetworkMessage::Subscribe { subscriptions } =>
|
||||
{
|
||||
// the result is dropped as it used solely for ergonomics
|
||||
let _ = service.attestation_service.validator_subscriptions(subscriptions);
|
||||
NetworkMessage::Subscribe { subscriptions } => {
|
||||
// the result is dropped as it used solely for ergonomics
|
||||
let _ = service
|
||||
.attestation_service
|
||||
.validator_subscriptions(subscriptions);
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => break,
|
||||
@@ -258,24 +257,26 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
// process any attestation service events
|
||||
// NOTE: This must come after the network message processing as that may trigger events in
|
||||
// the attestation service.
|
||||
while let Ok(Async::Ready(Some(attestation_service_message))) = service.attestation_service.poll() {
|
||||
while let Ok(Async::Ready(Some(attestation_service_message))) =
|
||||
service.attestation_service.poll()
|
||||
{
|
||||
match attestation_service_message {
|
||||
// TODO: Implement
|
||||
AttServiceMessage::Subscribe(subnet_id) => {
|
||||
service.libp2p.swarm.subscribe_to_subnet(subnet_id);
|
||||
},
|
||||
}
|
||||
AttServiceMessage::Unsubscribe(subnet_id) => {
|
||||
service.libp2p.swarm.subscribe_to_subnet(subnet_id);
|
||||
},
|
||||
}
|
||||
AttServiceMessage::EnrAdd(subnet_id) => {
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
||||
},
|
||||
}
|
||||
AttServiceMessage::EnrRemove(subnet_id) => {
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
|
||||
},
|
||||
}
|
||||
AttServiceMessage::DiscoverPeers(subnet_id) => {
|
||||
service.libp2p.swarm.peers_request(subnet_id);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,26 +290,38 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event {
|
||||
peers_to_ban.push(peer_id.clone());
|
||||
};
|
||||
service.router_send
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::RPC(peer_id, rpc_event))
|
||||
.map_err(|_| { debug!(log, "Failed to send RPC to router");} )?;
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send RPC to router");
|
||||
})?;
|
||||
}
|
||||
BehaviourEvent::PeerDialed(peer_id) => {
|
||||
debug!(log, "Peer Dialed"; "peer_id" => format!("{}", peer_id));
|
||||
service.router_send
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::PeerDialed(peer_id))
|
||||
.map_err(|_| { debug!(log, "Failed to send peer dialed to router");})?;
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send peer dialed to router");
|
||||
})?;
|
||||
}
|
||||
BehaviourEvent::PeerDisconnected(peer_id) => {
|
||||
debug!(log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id));
|
||||
service.router_send
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::PeerDisconnected(peer_id))
|
||||
.map_err(|_| { debug!(log, "Failed to send peer disconnect to router");})?;
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send peer disconnect to router");
|
||||
})?;
|
||||
}
|
||||
BehaviourEvent::StatusPeer(peer_id) => {
|
||||
service.router_send
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::StatusPeer(peer_id))
|
||||
.map_err(|_| { debug!(log, "Failed to send re-status peer to router");})?;
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send re-status peer to router");
|
||||
})?;
|
||||
}
|
||||
BehaviourEvent::PubsubMessage {
|
||||
id,
|
||||
@@ -316,7 +329,6 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
|
||||
match message {
|
||||
// attestation information gets processed in the attestation service
|
||||
PubsubMessage::Attestation(ref subnet_and_attestation) => {
|
||||
@@ -324,17 +336,28 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
let attestation = &subnet_and_attestation.1;
|
||||
// checks if we have an aggregator for the slot. If so, we process
|
||||
// the attestation
|
||||
if service.attestation_service.should_process_attestation(&id, &source, subnet, attestation) {
|
||||
service.router_send
|
||||
.try_send(RouterMessage::PubsubMessage(id, source, message))
|
||||
.map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?;
|
||||
}
|
||||
if service.attestation_service.should_process_attestation(
|
||||
&id,
|
||||
&source,
|
||||
subnet,
|
||||
attestation,
|
||||
) {
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::PubsubMessage(id, source, message))
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send pubsub message to router");
|
||||
})?;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// all else is sent to the router
|
||||
service.router_send
|
||||
.try_send(RouterMessage::PubsubMessage(id, source, message))
|
||||
.map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?;
|
||||
service
|
||||
.router_send
|
||||
.try_send(RouterMessage::PubsubMessage(id, source, message))
|
||||
.map_err(|_| {
|
||||
debug!(log, "Failed to send pubsub message to router");
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -355,19 +378,20 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
}
|
||||
|
||||
// if we have just forked, update inform the libp2p layer
|
||||
if let Some(mut update_fork_delay) = service.next_fork_update.take() {
|
||||
if let Some(mut update_fork_delay) = service.next_fork_update.take() {
|
||||
if !update_fork_delay.is_elapsed() {
|
||||
if let Ok(Async::Ready(_)) = update_fork_delay.poll() {
|
||||
service.libp2p.swarm.update_fork_version(service.beacon_chain.enr_fork_id());
|
||||
service.next_fork_update = next_fork_delay(&service.beacon_chain);
|
||||
service
|
||||
.libp2p
|
||||
.swarm
|
||||
.update_fork_version(service.beacon_chain.enr_fork_id());
|
||||
service.next_fork_update = next_fork_delay(&service.beacon_chain);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
})
|
||||
|
||||
);
|
||||
}));
|
||||
|
||||
Ok(network_exit)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user