Compare commits

...

4 Commits

Author SHA1 Message Date
Paul Hauner
ebb25b5569 Bump version to v0.2.6 (#1549)
## Issue Addressed

NA

## Proposed Changes

See title.

## Additional Info

NA
2020-08-19 09:31:01 +00:00
Pawan Dhananjay
bbed42f30c Refactor attestation service (#1415)
## Issue Addressed

N/A

## Proposed Changes

Refactor attestation service to send out requests to find peers for subnets as soon as we get attestation duties. 
Earlier, we had much more involved logic to send the discovery requests to the discovery service only 6 slots before the attestation slot. Now that discovery is much smarter with grouped queries, the complexity in attestation service can be reduced considerably.



Co-authored-by: Age Manning <Age@AgeManning.com>
2020-08-19 08:46:25 +00:00
divma
fdc6e2aa8e Shutdown like a Sir (#1545)
## Issue Addressed
#1494 

## Proposed Changes
- Give the TaskExecutor the sender side of a channel that a task can clone to request shutting down
- The receiver side of this channel is in environment and now we block until ctrl+c or an internal shutdown signal is received
- The swarm now informs when it has reached 0 listeners
- The network receives this message and requests the shutdown
2020-08-19 05:51:14 +00:00
Paul Hauner
8e7dd7b2b1 Add remaining network ops to queuing system (#1546)
## Issue Addressed

NA

## Proposed Changes

- Refactors the `BeaconProcessor` to remove some excessive nesting and file bloat
  - Sorry about the noise from this, it's all contained in 4d3f8c5 though.
- Adds exits, proposer slashings, attester slashings to the `BeaconProcessor` so we don't get overwhelmed with large amounts of slashings (which happened a few hours ago).

## Additional Info

NA
2020-08-19 05:09:53 +00:00
31 changed files with 1377 additions and 1079 deletions

17
Cargo.lock generated
View File

@@ -2,7 +2,7 @@
# It is not intended for manual editing.
[[package]]
name = "account_manager"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"account_utils",
"bls",
@@ -373,7 +373,7 @@ dependencies = [
[[package]]
name = "beacon_node"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"beacon_chain",
"clap",
@@ -530,7 +530,7 @@ dependencies = [
[[package]]
name = "boot_node"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"clap",
"discv5",
@@ -1699,9 +1699,9 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "flate2"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e"
checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94"
dependencies = [
"cfg-if",
"crc32fast",
@@ -2523,7 +2523,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lcli"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"bls",
"clap",
@@ -2874,14 +2874,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af67924b8dd885cccea261866c8ce5b74d239d272e154053ff927dae839f5ae9"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "lighthouse"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"account_manager",
"account_utils",
@@ -6017,7 +6016,7 @@ dependencies = [
[[package]]
name = "validator_client"
version = "0.2.5"
version = "0.2.6"
dependencies = [
"account_utils",
"bls",

View File

@@ -1,6 +1,6 @@
[package]
name = "account_manager"
version = "0.2.5"
version = "0.2.6"
authors = ["Paul Hauner <paul@paulhauner.com>", "Luke Anderson <luke@sigmaprime.io>"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
[package]
name = "beacon_node"
version = "0.2.5"
version = "0.2.6"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
use crate::rpc::*;
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic};
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
@@ -29,7 +29,6 @@ use std::{
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId};
@@ -301,8 +300,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
/// would like to retain the peers for.
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
self.peer_manager.discover_subnet_peers(subnet_id, min_ttl)
pub fn discover_subnet_peers(&mut self, subnet_subscriptions: Vec<SubnetDiscovery>) {
self.peer_manager
.discover_subnet_peers(subnet_subscriptions)
}
/// Updates the local ENR's "eth2" field with the latest EnrForkId.

View File

@@ -8,7 +8,7 @@ pub use enr_ext::{CombinedKeyExt, EnrExt};
pub use libp2p::core::identity::Keypair;
use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery};
use discv5::{enr::NodeId, Discv5, Discv5Event};
use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
@@ -305,12 +305,19 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
/// Processes a request to search for more peers on a subnet.
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
// If the discv5 service isn't running, ignore queries
if !self.started {
return;
}
self.add_subnet_query(subnet_id, min_ttl, 0);
debug!(
self.log,
"Making discovery query for subnets";
"subnets" => format!("{:?}", subnets_to_discover.iter().map(|s| s.subnet_id).collect::<Vec<_>>())
);
for subnet in subnets_to_discover {
self.add_subnet_query(subnet.subnet_id, subnet.min_ttl, 0);
}
}
/// Add an ENR to the routing table of the discovery mechanism.
@@ -514,6 +521,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// This query is for searching for peers of a particular subnet
// Drain subnet_queries so we can re-use it as we continue to process the queue
let grouped_queries: Vec<SubnetQuery> = subnet_queries.drain(..).collect();
debug!(
self.log,
"Starting grouped subnet query";
"subnets" => format!("{:?}", grouped_queries.iter().map(|q| q.subnet_id).collect::<Vec<_>>()),
);
self.start_subnet_query(grouped_queries);
}
}

View File

@@ -1,5 +1,6 @@
///! The subnet predicate used for searching for a particular subnet.
use super::*;
use slog::{debug, trace};
use std::ops::Deref;
/// Returns the predicate for a given subnet.
@@ -30,7 +31,7 @@ where
.collect();
if matches.is_empty() {
debug!(
trace!(
log_clone,
"Peer found but not on any of the desired subnets";
"peer_id" => format!("{}", enr.peer_id())

View File

@@ -14,7 +14,7 @@ pub mod rpc;
mod service;
pub mod types;
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage};
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};

View File

@@ -4,7 +4,7 @@ pub use self::peerdb::*;
use crate::discovery::{Discovery, DiscoveryEvent};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{error, metrics};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
use futures::prelude::*;
use futures::Stream;
use hashset_delay::HashSetDelay;
@@ -19,7 +19,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use types::{EthSpec, SubnetId};
use types::EthSpec;
pub use libp2p::core::{identity::Keypair, Multiaddr};
@@ -213,17 +213,19 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
/// A request to find peers on a given subnet.
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
// Extend the time to maintain peers if required.
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(subnet_id, min_ttl);
for s in subnets_to_discover.iter() {
if let Some(min_ttl) = s.min_ttl {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(s.subnet_id, min_ttl);
}
}
// request the subnet query from discovery
self.discovery.discover_subnet_peers(subnet_id, min_ttl);
self.discovery.discover_subnet_peers(subnets_to_discover);
}
/// A STATUS message has been received from a peer. This resets the status timer.

View File

@@ -36,6 +36,8 @@ pub enum Libp2pEvent<TSpec: EthSpec> {
Behaviour(BehaviourEvent<TSpec>),
/// A new listening address has been established.
NewListenAddr(Multiaddr),
/// We reached zero listening addresses.
ZeroListeners,
}
/// The configuration and state of the libp2p components for the beacon node.
@@ -283,10 +285,17 @@ impl<TSpec: EthSpec> Service<TSpec> {
debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string())
}
SwarmEvent::ListenerClosed { addresses, reason } => {
crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason))
crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason));
if Swarm::listeners(&self.swarm).count() == 0 {
return Libp2pEvent::ZeroListeners;
}
}
SwarmEvent::ListenerError { error } => {
warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()))
// this is non fatal, but we still check
warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()));
if Swarm::listeners(&self.swarm).count() == 0 {
return Libp2pEvent::ZeroListeners;
}
}
SwarmEvent::Dialing(peer_id) => {
debug!(self.log, "Dialing peer"; "peer_id" => peer_id.to_string());

View File

@@ -1,6 +1,7 @@
pub mod error;
mod globals;
mod pubsub;
mod subnet;
mod sync_state;
mod topics;
@@ -13,5 +14,6 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::PubsubMessage;
pub use subnet::SubnetDiscovery;
pub use sync_state::SyncState;
pub use topics::{GossipEncoding, GossipKind, GossipTopic};

View File

@@ -0,0 +1,28 @@
use std::time::{Duration, Instant};
use types::SubnetId;
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
/// A subnet to discover peers on along with the instant after which it's no longer useful.
#[derive(Debug, Clone)]
pub struct SubnetDiscovery {
pub subnet_id: SubnetId,
pub min_ttl: Option<Instant>,
}
impl PartialEq for SubnetDiscovery {
fn eq(&self, other: &SubnetDiscovery) -> bool {
self.subnet_id == other.subnet_id
&& match (self.min_ttl, other.min_ttl) {
(Some(min_ttl_instant), Some(other_min_ttl_instant)) => {
min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
< DURATION_DIFFERENCE
&& other_min_ttl_instant.saturating_duration_since(min_ttl_instant)
< DURATION_DIFFERENCE
}
(None, None) => true,
(None, Some(_)) => true,
(Some(_), None) => true,
}
}
}

View File

@@ -94,8 +94,13 @@ pub async fn build_libp2p_instance(boot_nodes: Vec<Enr>, log: slog::Logger) -> L
// launch libp2p service
let (signal, exit) = exit_future::signal();
let executor =
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = environment::TaskExecutor::new(
tokio::runtime::Handle::current(),
exit,
log.clone(),
shutdown_tx,
);
Libp2pInstance(
LibP2PService::new(executor, &config, EnrForkId::default(), &log)
.await

View File

@@ -2,7 +2,7 @@
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -13,7 +13,7 @@ use rand::seq::SliceRandom;
use slog::{crit, debug, error, o, trace, warn};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{types::GossipKind, NetworkGlobals};
use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery};
use hashset_delay::HashSetDelay;
use rest_types::ValidatorSubscription;
use slot_clock::SlotClock;
@@ -25,11 +25,8 @@ mod tests;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 1;
/// The number of slots ahead that we attempt to discover peers for a subscription. If the slot to
/// attest to is greater than this, we queue a discovery request for this many slots prior to
/// subscribing.
const TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 6;
/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s.
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
/// gossip topics that we subscribed to due to the validator connection.
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
@@ -39,12 +36,10 @@ const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
/// Note: The time is calculated as `time = milliseconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`.
const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
/// The default number of slots before items in hash delay sets used by this class should expire.
/// 36s at 12s slot time
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
// 36s at 12s slot time
/// The duration difference between two instance for them to be considered equal.
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
#[derive(Debug, Eq, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub enum AttServiceMessage {
/// Subscribe to the specified subnet id.
Subscribe(SubnetId),
@@ -54,44 +49,8 @@ pub enum AttServiceMessage {
EnrAdd(SubnetId),
/// Remove the `SubnetId` from the ENR bitfield.
EnrRemove(SubnetId),
/// Discover peers for a particular subnet.
/// The includes the `Instant` we need the discovered peer until.
DiscoverPeers {
subnet_id: SubnetId,
min_ttl: Option<Instant>,
},
}
impl PartialEq for AttServiceMessage {
fn eq(&self, other: &AttServiceMessage) -> bool {
match (self, other) {
(&AttServiceMessage::Subscribe(a), &AttServiceMessage::Subscribe(b)) => a == b,
(&AttServiceMessage::Unsubscribe(a), &AttServiceMessage::Unsubscribe(b)) => a == b,
(&AttServiceMessage::EnrAdd(a), &AttServiceMessage::EnrAdd(b)) => a == b,
(&AttServiceMessage::EnrRemove(a), &AttServiceMessage::EnrRemove(b)) => a == b,
(
&AttServiceMessage::DiscoverPeers { subnet_id, min_ttl },
&AttServiceMessage::DiscoverPeers {
subnet_id: other_subnet_id,
min_ttl: other_min_ttl,
},
) => {
subnet_id == other_subnet_id
&& match (min_ttl, other_min_ttl) {
(Some(min_ttl_instant), Some(other_min_ttl_instant)) => {
min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
< DURATION_DIFFERENCE
&& other_min_ttl_instant.saturating_duration_since(min_ttl_instant)
< DURATION_DIFFERENCE
}
(None, None) => true,
(None, Some(_)) => true,
(Some(_), None) => true,
}
}
_ => false,
}
}
/// Discover peers for a list of `SubnetDiscovery`.
DiscoverPeers(Vec<SubnetDiscovery>),
}
/// A particular subnet at a given slot.
@@ -116,9 +75,6 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
random_subnets: HashSetDelay<SubnetId>,
/// A collection of timeouts for when to start searching for peers for a particular shard.
discover_peers: HashSetDelay<ExactSubnet>,
/// A collection of timeouts for when to subscribe to a shard subnet.
subscriptions: HashSetDelay<ExactSubnet>,
@@ -172,7 +128,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
network_globals,
beacon_chain,
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
discover_peers: HashSetDelay::new(default_timeout),
subscriptions: HashSetDelay::new(default_timeout),
unsubscriptions: HashSetDelay::new(default_timeout),
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
@@ -198,6 +153,8 @@ impl<T: BeaconChainTypes> AttestationService<T> {
&mut self,
subscriptions: Vec<ValidatorSubscription>,
) -> Result<(), String> {
// Maps each subnet_id subscription to it's highest slot
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
for subscription in subscriptions {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
//NOTE: We assume all subscriptions have been verified before reaching this service
@@ -226,15 +183,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
continue;
}
};
// Ensure each subnet_id inserted into the map has the highest slot as it's value.
// Higher slot corresponds to higher min_ttl in the `SubnetDiscovery` entry.
if let Some(slot) = subnets_to_discover.get(&subnet_id) {
if subscription.slot > *slot {
subnets_to_discover.insert(subnet_id, subscription.slot);
}
} else {
subnets_to_discover.insert(subnet_id, subscription.slot);
}
let exact_subnet = ExactSubnet {
subnet_id,
slot: subscription.slot,
};
// determine if we should run a discovery lookup request and request it if required
if let Err(e) = self.discover_peers_request(exact_subnet.clone()) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
}
// determine if the validator is an aggregator. If so, we subscribe to the subnet and
// if successful add the validator to a mapping of known aggregators for that exact
@@ -264,6 +226,14 @@ impl<T: BeaconChainTypes> AttestationService<T> {
}
}
if let Err(e) = self.discover_peers_request(
subnets_to_discover
.into_iter()
.map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }),
) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
waker.wake_by_ref();
@@ -290,114 +260,55 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
/// If there is sufficient time and no other request exists, queues a peer discovery request
/// for the required subnet.
fn discover_peers_request(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
fn discover_peers_request(
&mut self,
exact_subnets: impl Iterator<Item = ExactSubnet>,
) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or_else(|| "Could not get the current slot")?;
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// if there is enough time to perform a discovery lookup
if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
// check if a discovery request already exists
if self.discover_peers.get(&exact_subnet).is_some() {
// already a request queued, end
return Ok(());
}
// if the slot is more than epoch away, add an event to start looking for peers
if exact_subnet.slot
< current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl);
} else {
// Queue the discovery event to be executed for
// TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD
let duration_to_discover = {
let duration_to_next_slot = self
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
.filter_map(|exact_subnet| {
// check if there is enough time to perform a discovery lookup
if exact_subnet.slot
>= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// if the slot is more than epoch away, add an event to start looking for peers
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| "Unable to determine duration to next slot")?;
// The -1 is done here to exclude the current slot duration, as we will use
// `duration_to_next_slot`.
let slots_until_discover = exact_subnet
.slot
.saturating_sub(current_slot)
.saturating_sub(1u64)
.saturating_sub(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD);
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery {
subnet_id: exact_subnet.subnet_id,
min_ttl,
})
} else {
// TODO: Send the time frame needed to have a peer connected, so that we can
// maintain peers for a least this duration.
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
warn!(self.log,
"Not enough time for a discovery search";
"subnet_id" => format!("{:?}", exact_subnet)
);
None
}
})
.collect();
duration_to_next_slot + slot_duration * (slots_until_discover.as_u64() as u32)
};
self.discover_peers
.insert_at(exact_subnet, duration_to_discover);
}
} else {
// TODO: Send the time frame needed to have a peer connected, so that we can
// maintain peers for a least this duration.
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
return Err("Not enough time for a discovery search");
if !discovery_subnets.is_empty() {
self.events
.push_back(AttServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
/// Checks if we have a discover peers event already and sends a new event if necessary
///
/// If a message exists for the same subnet, compare the `min_ttl` of the current and
/// existing messages and extend the existing message as necessary.
fn send_or_update_discovery_event(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
// track whether this message already exists in the event queue
let mut is_duplicate = false;
self.events.iter_mut().for_each(|event| {
if let AttServiceMessage::DiscoverPeers {
subnet_id: other_subnet_id,
min_ttl: other_min_ttl,
} = event
{
if subnet_id == *other_subnet_id {
let other_min_ttl_clone = *other_min_ttl;
match (min_ttl, other_min_ttl_clone) {
(Some(min_ttl_instant), Some(other_min_ttl_instant)) =>
// only update the min_ttl if it is greater than the existing min_ttl and a DURATION_DIFFERENCE padding
{
if min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
> DURATION_DIFFERENCE
{
*other_min_ttl = min_ttl;
}
}
(None, Some(_)) => {} // Keep the current one as it has an actual min_ttl
(Some(min_ttl), None) => {
// Update the request to include a min_ttl.
*other_min_ttl = Some(min_ttl);
}
(None, None) => {} // Duplicate message, do nothing.
}
is_duplicate = true;
return;
}
};
});
if !is_duplicate {
self.events
.push_back(AttServiceMessage::DiscoverPeers { subnet_id, min_ttl });
}
}
/// Checks the current random subnets and subscriptions to determine if a new subscription for this
/// subnet is required for the given slot.
///
@@ -547,7 +458,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if !already_subscribed {
// send a discovery request and a subscription
self.send_or_update_discovery_event(subnet_id, None);
self.events
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
min_ttl: None,
}]));
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
}
@@ -558,20 +473,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/* A collection of functions that handle the various timeouts */
/// Request a discovery query to find peers for a particular subnet.
fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) {
debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot);
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl)
}
/// A queued subscription is ready.
///
/// We add subscriptions events even if we are already subscribed to a random subnet (as these
@@ -731,15 +632,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
self.waker = Some(cx.waker().clone());
}
// process any peer discovery events
match self.discover_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peer discovery requests"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any subscription events
match self.subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet),

View File

@@ -8,7 +8,9 @@ mod tests {
migrate::NullMigrator,
};
use eth2_libp2p::discovery::{build_enr, Keypair};
use eth2_libp2p::{discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals};
use eth2_libp2p::{
discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery,
};
use futures::Stream;
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use lazy_static::lazy_static;
@@ -120,23 +122,21 @@ mod tests {
}
}
fn _get_subscriptions(
fn get_subscriptions(
validator_count: u64,
slot: Slot,
committee_count_at_slot: u64,
) -> Vec<ValidatorSubscription> {
let mut subscriptions: Vec<ValidatorSubscription> = Vec::new();
for validator_index in 0..validator_count {
let is_aggregator = true;
subscriptions.push(ValidatorSubscription {
validator_index,
attestation_committee_index: validator_index,
slot,
committee_count_at_slot,
is_aggregator,
});
}
subscriptions
(0..validator_count)
.map(|validator_index| {
get_subscription(
validator_index,
validator_index,
slot,
committee_count_at_slot,
)
})
.collect()
}
// gets a number of events from the subscription service, or returns none if it times out after a number
@@ -210,14 +210,7 @@ mod tests {
let events = get_events(attestation_service, no_events_expected, 1).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -270,14 +263,7 @@ mod tests {
let events = get_events(attestation_service, no_events_expected, 2).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -330,19 +316,15 @@ mod tests {
&attestation_service.beacon_chain.spec,
)
.unwrap();
let expected = vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }];
let expected = vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
min_ttl,
}])];
let events = get_events(attestation_service, no_events_expected, 1).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -396,21 +378,14 @@ mod tests {
)
.unwrap();
let expected = vec![
AttServiceMessage::DiscoverPeers { subnet_id, min_ttl },
AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { subnet_id, min_ttl }]),
AttServiceMessage::Subscribe(subnet_id),
];
let events = get_events(attestation_service, no_events_expected, 5).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -454,14 +429,7 @@ mod tests {
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -517,20 +485,16 @@ mod tests {
// expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range
let expected: Vec<AttServiceMessage> =
vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }];
vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
min_ttl,
}])];
let events = get_events(attestation_service, no_events_expected, 5).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
);
// if there are fewer events than expected, there's been a collision
if events.len() == no_events_expected {
@@ -553,7 +517,7 @@ mod tests {
.now()
.expect("Could not get current slot");
let subscriptions = _get_subscriptions(
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
@@ -572,10 +536,9 @@ mod tests {
for event in events {
match event {
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant,
} => discover_peer_count = discover_peer_count + 1,
AttServiceMessage::DiscoverPeers(_) => {
discover_peer_count = discover_peer_count + 1
}
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
_ => unexpected_msg_count = unexpected_msg_count + 1,
@@ -605,7 +568,7 @@ mod tests {
.now()
.expect("Could not get current slot");
let subscriptions = _get_subscriptions(
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
@@ -624,10 +587,9 @@ mod tests {
for event in events {
match event {
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant,
} => discover_peer_count = discover_peer_count + 1,
AttServiceMessage::DiscoverPeers(_) => {
discover_peer_count = discover_peer_count + 1
}
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
_ => unexpected_msg_count = unexpected_msg_count + 1,
@@ -639,4 +601,40 @@ mod tests {
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
}
#[tokio::test]
async fn test_discovery_peers_count() {
let subscription_slot = 10;
let validator_count = 32;
let committee_count = 1;
let expected_events = 97;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
validator_count,
current_slot + subscription_slot,
committee_count,
);
// submit sthe subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
let events = get_events(attestation_service, expected_events, 3).await;
let event = events.get(96);
if let Some(AttServiceMessage::DiscoverPeers(d)) = event {
assert_eq!(d.len(), validator_count as usize);
} else {
panic!("Unexpected event {:?}", event);
}
}
}

View File

@@ -36,22 +36,22 @@
//! task.
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
BlockError, ForkChoiceError,
};
use chain_segment::handle_chain_segment;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use environment::TaskExecutor;
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger};
use ssz::Encode;
use slog::{crit, debug, error, trace, warn, Logger};
use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
};
use worker::Worker;
mod chain_segment;
mod worker;
pub use chain_segment::ProcessId;
@@ -78,6 +78,18 @@ const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024;
/// before we start dropping them.
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096;
/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
@@ -242,6 +254,54 @@ impl<E: EthSpec> WorkEvent<E> {
}
}
/// Create a new `Work` event for some exit.
pub fn gossip_voluntary_exit(
message_id: MessageId,
peer_id: PeerId,
voluntary_exit: Box<SignedVoluntaryExit>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipVoluntaryExit {
message_id,
peer_id,
voluntary_exit,
},
}
}
/// Create a new `Work` event for some proposer slashing.
pub fn gossip_proposer_slashing(
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: Box<ProposerSlashing>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipProposerSlashing {
message_id,
peer_id,
proposer_slashing,
},
}
}
/// Create a new `Work` event for some attester slashing.
pub fn gossip_attester_slashing(
message_id: MessageId,
peer_id: PeerId,
attester_slashing: Box<AttesterSlashing<E>>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipAttesterSlashing {
message_id,
peer_id,
attester_slashing,
},
}
}
/// Create a new `Work` event for some block, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(block: Box<SignedBeaconBlock<E>>) -> (Self, BlockResultReceiver<E>) {
@@ -282,6 +342,21 @@ pub enum Work<E: EthSpec> {
peer_id: PeerId,
block: Box<SignedBeaconBlock<E>>,
},
GossipVoluntaryExit {
message_id: MessageId,
peer_id: PeerId,
voluntary_exit: Box<SignedVoluntaryExit>,
},
GossipProposerSlashing {
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: Box<ProposerSlashing>,
},
GossipAttesterSlashing {
message_id: MessageId,
peer_id: PeerId,
attester_slashing: Box<AttesterSlashing<E>>,
},
RpcBlock {
block: Box<SignedBeaconBlock<E>>,
result_tx: BlockResultSender<E>,
@@ -299,6 +374,9 @@ impl<E: EthSpec> Work<E> {
Work::GossipAttestation { .. } => "gossip_attestation",
Work::GossipAggregate { .. } => "gossip_aggregate",
Work::GossipBlock { .. } => "gossip_block",
Work::GossipVoluntaryExit { .. } => "gossip_voluntary_exit",
Work::GossipProposerSlashing { .. } => "gossip_proposer_slashing",
Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing",
Work::RpcBlock { .. } => "rpc_block",
Work::ChainSegment { .. } => "chain_segment",
}
@@ -351,21 +429,33 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver<WorkEvent<T::EthSpec>>) {
let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
// Using LIFO queues for attestations since validator profits rely upon getting fresh
// attestations into blocks. Additionally, later attestations contain more information than
// earlier ones, so we consider them more valuable.
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
let mut aggregate_debounce = TimeLatch::default();
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_debounce = TimeLatch::default();
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
// a strong feeling about queue type for exits.
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
// queues with lots of junk messages.
let mut gossip_proposer_slashing_queue =
FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN);
let mut gossip_attester_slashing_queue =
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let executor = self.executor.clone();
// The manager future will run on the non-blocking executor and delegate tasks to worker
// The manager future will run on the core executor and delegate tasks to worker
// threads on the blocking executor.
let manager_future = async move {
loop {
@@ -452,6 +542,18 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(idle_tx.clone(), item);
} else if let Some(item) = attestation_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
// Check slashings after all other consensus messages so we prioritize
// following head.
//
// Check attester slashings before proposer slashings since they have the
// potential to slash multiple validators at once.
} else if let Some(item) = gossip_attester_slashing_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
// Check exits last since our validators don't get rewards from them.
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
}
}
// There is no new work event and we are unable to spawn a new worker.
@@ -491,6 +593,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::GossipVoluntaryExit { .. } => {
gossip_voluntary_exit_queue.push(work, work_id, &self.log)
}
Work::GossipProposerSlashing { .. } => {
gossip_proposer_slashing_queue.push(work, work_id, &self.log)
}
Work::GossipAttesterSlashing { .. } => {
gossip_attester_slashing_queue.push(work, work_id, &self.log)
}
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
@@ -523,6 +634,18 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL,
gossip_voluntary_exit_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL,
gossip_proposer_slashing_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL,
gossip_attester_slashing_queue.len() as i64,
);
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
@@ -544,7 +667,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
}
};
// Spawn on the non-blocking executor.
// Spawn on the core executor.
executor.spawn(manager_future, MANAGER_TASK_NAME);
}
@@ -574,11 +697,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
return;
};
let network_tx = self.network_tx.clone();
let sync_tx = self.sync_tx.clone();
let log = self.log.clone();
let executor = self.executor.clone();
let worker = Worker {
chain,
network_tx: self.network_tx.clone(),
sync_tx: self.sync_tx.clone(),
log: self.log.clone(),
};
trace!(
self.log,
"Spawning beacon processor worker";
@@ -589,298 +717,85 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
executor.spawn_blocking(
move || {
let _worker_timer = worker_timer;
let inner_log = log.clone();
// We use this closure pattern to avoid using a `return` that prevents the
// `idle_tx` message from sending.
let handler = || {
let log = inner_log.clone();
match work {
/*
* Unaggregated attestation verification.
*/
Work::GossipAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
} => {
let beacon_block_root = attestation.data.beacon_block_root;
let attestation = match chain
.verify_unaggregated_attestation_for_gossip(*attestation, subnet_id)
{
Ok(attestation) => attestation,
Err(e) => {
handle_attestation_verification_failure(
&log,
sync_tx,
peer_id,
beacon_block_root,
"unaggregated",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
if !should_import {
return;
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
match e {
BeaconChainError::ForkChoiceError(
ForkChoiceError::InvalidAttestation(e),
) => debug!(
log,
"Attestation invalid for fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
e => error!(
log,
"Error applying attestation to fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
}
}
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) {
debug!(
log,
"Attestation invalid for agg pool";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
}
/*
* Aggregated attestation verification.
*/
Work::GossipAggregate {
message_id,
peer_id,
aggregate,
} => {
let beacon_block_root =
aggregate.message.aggregate.data.beacon_block_root;
let aggregate =
match chain.verify_aggregated_attestation_for_gossip(*aggregate) {
Ok(aggregate) => aggregate,
Err(e) => {
handle_attestation_verification_failure(
&log,
sync_tx,
peer_id,
beacon_block_root,
"aggregated",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) {
match e {
BeaconChainError::ForkChoiceError(
ForkChoiceError::InvalidAttestation(e),
) => debug!(
log,
"Aggregate invalid for fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
e => error!(
log,
"Error applying aggregate to fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
}
}
if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) {
debug!(
log,
"Attestation invalid for op pool";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
}
/*
* Verification for beacon blocks received on gossip.
*/
Work::GossipBlock {
message_id,
peer_id,
block,
} => {
let verified_block = match chain.verify_block_for_gossip(*block) {
Ok(verified_block) => {
info!(
log,
"New block received";
"slot" => verified_block.block.slot(),
"hash" => verified_block.block_root.to_string()
);
propagate_gossip_message(
network_tx,
message_id,
peer_id.clone(),
&log,
);
verified_block
}
Err(BlockError::ParentUnknown(block)) => {
send_sync_message(
sync_tx,
SyncMessage::UnknownBlock(peer_id, block),
&log,
);
return;
}
Err(BlockError::BlockIsAlreadyKnown) => {
debug!(
log,
"Gossip block is already known";
);
return;
}
Err(e) => {
warn!(
log,
"Could not verify block for gossip";
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL,
);
let block = Box::new(verified_block.block.clone());
match chain.process_block(verified_block) {
Ok(_block_root) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
);
trace!(
log,
"Gossipsub block processed";
"peer_id" => peer_id.to_string()
);
// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}
}
Err(BlockError::ParentUnknown { .. }) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
error!(
log,
"Block with unknown parent attempted to be processed";
"peer_id" => peer_id.to_string()
);
send_sync_message(
sync_tx,
SyncMessage::UnknownBlock(peer_id, block),
&log,
);
}
other => {
debug!(
log,
"Invalid gossip beacon block";
"outcome" => format!("{:?}", other),
"block root" => format!("{}", block.canonical_root()),
"block slot" => block.slot()
);
trace!(
log,
"Invalid gossip beacon block ssz";
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
};
}
/*
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock { block, result_tx } => {
let block_result = chain.process_block(*block);
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL,
);
if result_tx.send(block_result).is_err() {
crit!(log, "Failed return sync block result");
}
}
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => {
handle_chain_segment(chain, process_id, blocks, sync_tx, log)
}
};
match work {
/*
* Unaggregated attestation verification.
*/
Work::GossipAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
} => worker.process_gossip_attestation(
message_id,
peer_id,
*attestation,
subnet_id,
should_import,
),
/*
* Aggregated attestation verification.
*/
Work::GossipAggregate {
message_id,
peer_id,
aggregate,
} => worker.process_gossip_aggregate(message_id, peer_id, *aggregate),
/*
* Verification for beacon blocks received on gossip.
*/
Work::GossipBlock {
message_id,
peer_id,
block,
} => worker.process_gossip_block(message_id, peer_id, *block),
/*
* Voluntary exits received on gossip.
*/
Work::GossipVoluntaryExit {
message_id,
peer_id,
voluntary_exit,
} => worker.process_gossip_voluntary_exit(message_id, peer_id, *voluntary_exit),
/*
* Proposer slashings received on gossip.
*/
Work::GossipProposerSlashing {
message_id,
peer_id,
proposer_slashing,
} => worker.process_gossip_proposer_slashing(
message_id,
peer_id,
*proposer_slashing,
),
/*
* Attester slashings received on gossip.
*/
Work::GossipAttesterSlashing {
message_id,
peer_id,
attester_slashing,
} => worker.process_gossip_attester_slashing(
message_id,
peer_id,
*attester_slashing,
),
/*
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock { block, result_tx } => {
worker.process_rpc_block(*block, result_tx)
}
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => {
worker.process_chain_segment(process_id, blocks)
}
};
handler();
trace!(
log,
@@ -902,300 +817,3 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
);
}
}
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
/// the gossip network.
///
/// Creates a log if there is an interal error.
fn propagate_gossip_message<E: EthSpec>(
network_tx: mpsc::UnboundedSender<NetworkMessage<E>>,
message_id: MessageId,
peer_id: PeerId,
log: &Logger,
) {
network_tx
.send(NetworkMessage::Validate {
propagation_source: peer_id,
message_id,
})
.unwrap_or_else(|_| {
warn!(
log,
"Could not send propagation request to the network service"
)
});
}
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an interal error.
fn send_sync_message<E: EthSpec>(
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
message: SyncMessage<E>,
log: &Logger,
) {
sync_tx
.send(message)
.unwrap_or_else(|_| error!(log, "Could not send message to the sync service"));
}
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
pub fn handle_attestation_verification_failure<E: EthSpec>(
log: &Logger,
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
peer_id: PeerId,
beacon_block_root: Hash256,
attestation_type: &str,
error: AttnError,
) {
metrics::register_attestation_error(&error);
match &error {
AttnError::FutureEpoch { .. }
| AttnError::PastEpoch { .. }
| AttnError::FutureSlot { .. }
| AttnError::PastSlot { .. } => {
/*
* These errors can be triggered by a mismatch between our slot and the peer.
*
*
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
*/
}
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
/*
* These errors are caused by invalid signatures.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::EmptyAggregationBitfield => {
/*
* The aggregate had no signatures and is therefore worthless.
*
* Whilst we don't gossip this attestation, this act is **not** a clear
* violation of the spec nor indication of fault.
*
* This may change soon. Reference:
*
* https://github.com/ethereum/eth2.0-specs/pull/1732
*/
}
AttnError::AggregatorPubkeyUnknown(_) => {
/*
* The aggregator index was higher than any known validator index. This is
* possible in two cases:
*
* 1. The attestation is malformed
* 2. The attestation attests to a beacon_block_root that we do not know.
*
* It should be impossible to reach (2) without triggering
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
* faulty.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AggregatorNotInCommittee { .. } => {
/*
* The aggregator index was higher than any known validator index. This is
* possible in two cases:
*
* 1. The attestation is malformed
* 2. The attestation attests to a beacon_block_root that we do not know.
*
* It should be impossible to reach (2) without triggering
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
* faulty.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AttestationAlreadyKnown { .. } => {
/*
* The aggregate attestation has already been observed on the network or in
* a block.
*
* The peer is not necessarily faulty.
*/
trace!(
log,
"Attestation already known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::AggregatorAlreadyKnown(_) => {
/*
* There has already been an aggregate attestation seen from this
* aggregator index.
*
* The peer is not necessarily faulty.
*/
trace!(
log,
"Aggregator already known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::PriorAttestationKnown { .. } => {
/*
* We have already seen an attestation from this validator for this epoch.
*
* The peer is not necessarily faulty.
*/
trace!(
log,
"Prior attestation known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::ValidatorIndexTooHigh(_) => {
/*
* The aggregator index (or similar field) was higher than the maximum
* possible number of validators.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::UnknownHeadBlock { beacon_block_root } => {
// Note: its a little bit unclear as to whether or not this block is unknown or
// just old. See:
//
// https://github.com/sigp/lighthouse/issues/1039
// TODO: Maintain this attestation and re-process once sync completes
debug!(
log,
"Attestation for unknown block";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root)
);
// we don't know the block, get the sync manager to handle the block lookup
sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
.unwrap_or_else(|_| {
warn!(
log,
"Failed to send to sync service";
"msg" => "UnknownBlockHash"
)
});
return;
}
AttnError::UnknownTargetRoot(_) => {
/*
* The block indicated by the target root is not known to us.
*
* We should always get `AttnError::UnknwonHeadBlock` before we get this
* error, so this means we can get this error if:
*
* 1. The target root does not represent a valid block.
* 2. We do not have the target root in our DB.
*
* For (2), we should only be processing attestations when we should have
* all the available information. Note: if we do a weak-subjectivity sync
* it's possible that this situation could occur, but I think it's
* unlikely. For now, we will declare this to be an invalid message>
*
* The peer has published an invalid consensus message.
*/
}
AttnError::BadTargetEpoch => {
/*
* The aggregator index (or similar field) was higher than the maximum
* possible number of validators.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::NoCommitteeForSlotAndIndex { .. } => {
/*
* It is not possible to attest this the given committee in the given slot.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::NotExactlyOneAggregationBitSet(_) => {
/*
* The unaggregated attestation doesn't have only one signature.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AttestsToFutureBlock { .. } => {
/*
* The beacon_block_root is from a higher slot than the attestation.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::InvalidSubnetId { received, expected } => {
/*
* The attestation was received on an incorrect subnet id.
*/
debug!(
log,
"Received attestation on incorrect subnet";
"expected" => format!("{:?}", expected),
"received" => format!("{:?}", received),
)
}
AttnError::Invalid(_) => {
/*
* The attestation failed the state_processing verification.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::TooManySkippedSlots {
head_block_slot,
attestation_slot,
} => {
/*
* The attestation references a head block that is too far behind the attestation slot.
*
* The message is not necessarily invalid, but we choose to ignore it.
*/
debug!(
log,
"Rejected long skip slot attestation";
"head_block_slot" => head_block_slot,
"attestation_slot" => attestation_slot,
)
}
AttnError::BeaconChainError(e) => {
/*
* Lighthouse hit an unexpected error whilst processing the attestation. It
* should be impossible to trigger a `BeaconChainError` from the network,
* so we have a bug.
*
* It's not clear if the message is invalid/malicious.
*/
error!(
log,
"Unable to validate aggregate";
"peer_id" => peer_id.to_string(),
"error" => format!("{:?}", e),
);
}
}
debug!(
log,
"Invalid attestation from network";
"reason" => format!("{:?}", error),
"block" => format!("{}", beacon_block_root),
"peer_id" => peer_id.to_string(),
"type" => format!("{:?}", attestation_type),
);
}

View File

@@ -0,0 +1,726 @@
use super::{
chain_segment::{handle_chain_segment, ProcessId},
BlockResultSender,
};
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{
attestation_verification::Error as AttnError, observed_operations::ObservationOutcome,
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
};
use eth2_libp2p::{MessageId, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger};
use ssz::Encode;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
};
/// Contains the context necessary to import blocks, attestations, etc to the beacon chain.
pub struct Worker<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
pub log: Logger,
}
impl<T: BeaconChainTypes> Worker<T> {
/// Process the unaggregated attestation received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
/// - Attempt to apply it to fork choice.
/// - Attempt to add it to the naive aggregation pool.
///
/// Raises a log if there are errors.
pub fn process_gossip_attestation(
self,
message_id: MessageId,
peer_id: PeerId,
attestation: Attestation<T::EthSpec>,
subnet_id: SubnetId,
should_import: bool,
) {
let beacon_block_root = attestation.data.beacon_block_root;
let attestation = match self
.chain
.verify_unaggregated_attestation_for_gossip(attestation, subnet_id)
{
Ok(attestation) => attestation,
Err(e) => {
self.handle_attestation_verification_failure(
peer_id,
beacon_block_root,
"unaggregated",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_gossip_message(message_id, peer_id.clone());
if !should_import {
return;
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL);
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) {
match e {
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
debug!(
self.log,
"Attestation invalid for fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
e => error!(
self.log,
"Error applying attestation to fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
}
}
if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) {
debug!(
self.log,
"Attestation invalid for agg pool";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL);
}
/// Process the aggregated attestation received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
/// - Attempt to apply it to fork choice.
/// - Attempt to add it to the block inclusion pool.
///
/// Raises a log if there are errors.
pub fn process_gossip_aggregate(
self,
message_id: MessageId,
peer_id: PeerId,
aggregate: SignedAggregateAndProof<T::EthSpec>,
) {
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
let aggregate = match self
.chain
.verify_aggregated_attestation_for_gossip(aggregate)
{
Ok(aggregate) => aggregate,
Err(e) => {
self.handle_attestation_verification_failure(
peer_id,
beacon_block_root,
"aggregated",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_gossip_message(message_id, peer_id.clone());
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) {
match e {
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
debug!(
self.log,
"Aggregate invalid for fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
e => error!(
self.log,
"Error applying aggregate to fork choice";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
),
}
}
if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) {
debug!(
self.log,
"Attestation invalid for op pool";
"reason" => format!("{:?}", e),
"peer" => peer_id.to_string(),
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL);
}
/// Process the beacon block received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
/// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to
/// be downloaded.
///
/// Raises a log if there are errors.
pub fn process_gossip_block(
self,
message_id: MessageId,
peer_id: PeerId,
block: SignedBeaconBlock<T::EthSpec>,
) {
let verified_block = match self.chain.verify_block_for_gossip(block) {
Ok(verified_block) => {
info!(
self.log,
"New block received";
"slot" => verified_block.block.slot(),
"hash" => verified_block.block_root.to_string()
);
self.propagate_gossip_message(message_id, peer_id.clone());
verified_block
}
Err(BlockError::ParentUnknown(block)) => {
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
return;
}
Err(BlockError::BlockIsAlreadyKnown) => {
debug!(
self.log,
"Gossip block is already known";
);
return;
}
Err(e) => {
warn!(
self.log,
"Could not verify block for gossip";
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
let block = Box::new(verified_block.block.clone());
match self.chain.process_block(verified_block) {
Ok(_block_root) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
trace!(
self.log,
"Gossipsub block processed";
"peer_id" => peer_id.to_string()
);
// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}
}
Err(BlockError::ParentUnknown { .. }) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
error!(
self.log,
"Block with unknown parent attempted to be processed";
"peer_id" => peer_id.to_string()
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
}
other => {
debug!(
self.log,
"Invalid gossip beacon block";
"outcome" => format!("{:?}", other),
"block root" => format!("{}", block.canonical_root()),
"block slot" => block.slot()
);
trace!(
self.log,
"Invalid gossip beacon block ssz";
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
};
}
pub fn process_gossip_voluntary_exit(
self,
message_id: MessageId,
peer_id: PeerId,
voluntary_exit: SignedVoluntaryExit,
) {
let validator_index = voluntary_exit.message.validator_index;
let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
Ok(ObservationOutcome::New(exit)) => exit,
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping exit for already exiting validator";
"validator_index" => validator_index,
"peer" => peer_id.to_string()
);
return;
}
Err(e) => {
debug!(
self.log,
"Dropping invalid exit";
"validator_index" => validator_index,
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id);
self.chain.import_voluntary_exit(exit);
debug!(self.log, "Successfully imported voluntary exit");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL);
}
pub fn process_gossip_proposer_slashing(
self,
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: ProposerSlashing,
) {
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;
let slashing = match self
.chain
.verify_proposer_slashing_for_gossip(proposer_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping proposer slashing";
"reason" => "Already seen a proposer slashing for that validator",
"validator_index" => validator_index,
"peer" => peer_id.to_string()
);
return;
}
Err(e) => {
debug!(
self.log,
"Dropping invalid proposer slashing";
"validator_index" => validator_index,
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id);
self.chain.import_proposer_slashing(slashing);
debug!(self.log, "Successfully imported proposer slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
}
pub fn process_gossip_attester_slashing(
self,
message_id: MessageId,
peer_id: PeerId,
attester_slashing: AttesterSlashing<T::EthSpec>,
) {
let slashing = match self
.chain
.verify_attester_slashing_for_gossip(attester_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping attester slashing";
"reason" => "Slashings already known for all slashed validators",
"peer" => peer_id.to_string()
);
return;
}
Err(e) => {
debug!(
self.log,
"Dropping invalid attester slashing";
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id);
if let Err(e) = self.chain.import_attester_slashing(slashing) {
debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e));
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL);
} else {
debug!(self.log, "Successfully imported attester slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
}
}
/// Attempt to process a block received from a direct RPC request, returning the processing
/// result on the `result_tx` channel.
///
/// Raises a log if there are errors publishing the result to the channel.
pub fn process_rpc_block(
self,
block: SignedBeaconBlock<T::EthSpec>,
result_tx: BlockResultSender<T::EthSpec>,
) {
let block_result = self.chain.process_block(block);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
if result_tx.send(block_result).is_err() {
crit!(self.log, "Failed return sync block result");
}
}
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub fn process_chain_segment(
self,
process_id: ProcessId,
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
) {
handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log)
}
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
/// the gossip network.
///
/// Creates a log if there is an interal error.
fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) {
self.network_tx
.send(NetworkMessage::Validate {
propagation_source: peer_id,
message_id,
})
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send propagation request to the network service"
)
});
}
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an interal error.
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
self.sync_tx
.send(message)
.unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service"));
}
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
pub fn handle_attestation_verification_failure(
&self,
peer_id: PeerId,
beacon_block_root: Hash256,
attestation_type: &str,
error: AttnError,
) {
metrics::register_attestation_error(&error);
match &error {
AttnError::FutureEpoch { .. }
| AttnError::PastEpoch { .. }
| AttnError::FutureSlot { .. }
| AttnError::PastSlot { .. } => {
/*
* These errors can be triggered by a mismatch between our slot and the peer.
*
*
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
*/
}
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
/*
* These errors are caused by invalid signatures.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::EmptyAggregationBitfield => {
/*
* The aggregate had no signatures and is therefore worthless.
*
* Whilst we don't gossip this attestation, this act is **not** a clear
* violation of the spec nor indication of fault.
*
* This may change soon. Reference:
*
* https://github.com/ethereum/eth2.0-specs/pull/1732
*/
}
AttnError::AggregatorPubkeyUnknown(_) => {
/*
* The aggregator index was higher than any known validator index. This is
* possible in two cases:
*
* 1. The attestation is malformed
* 2. The attestation attests to a beacon_block_root that we do not know.
*
* It should be impossible to reach (2) without triggering
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
* faulty.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AggregatorNotInCommittee { .. } => {
/*
* The aggregator index was higher than any known validator index. This is
* possible in two cases:
*
* 1. The attestation is malformed
* 2. The attestation attests to a beacon_block_root that we do not know.
*
* It should be impossible to reach (2) without triggering
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
* faulty.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AttestationAlreadyKnown { .. } => {
/*
* The aggregate attestation has already been observed on the network or in
* a block.
*
* The peer is not necessarily faulty.
*/
trace!(
self.log,
"Attestation already known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::AggregatorAlreadyKnown(_) => {
/*
* There has already been an aggregate attestation seen from this
* aggregator index.
*
* The peer is not necessarily faulty.
*/
trace!(
self.log,
"Aggregator already known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::PriorAttestationKnown { .. } => {
/*
* We have already seen an attestation from this validator for this epoch.
*
* The peer is not necessarily faulty.
*/
trace!(
self.log,
"Prior attestation known";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type),
);
return;
}
AttnError::ValidatorIndexTooHigh(_) => {
/*
* The aggregator index (or similar field) was higher than the maximum
* possible number of validators.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::UnknownHeadBlock { beacon_block_root } => {
// Note: its a little bit unclear as to whether or not this block is unknown or
// just old. See:
//
// https://github.com/sigp/lighthouse/issues/1039
// TODO: Maintain this attestation and re-process once sync completes
debug!(
self.log,
"Attestation for unknown block";
"peer_id" => peer_id.to_string(),
"block" => format!("{}", beacon_block_root)
);
// we don't know the block, get the sync manager to handle the block lookup
self.sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
.unwrap_or_else(|_| {
warn!(
self.log,
"Failed to send to sync service";
"msg" => "UnknownBlockHash"
)
});
return;
}
AttnError::UnknownTargetRoot(_) => {
/*
* The block indicated by the target root is not known to us.
*
* We should always get `AttnError::UnknwonHeadBlock` before we get this
* error, so this means we can get this error if:
*
* 1. The target root does not represent a valid block.
* 2. We do not have the target root in our DB.
*
* For (2), we should only be processing attestations when we should have
* all the available information. Note: if we do a weak-subjectivity sync
* it's possible that this situation could occur, but I think it's
* unlikely. For now, we will declare this to be an invalid message>
*
* The peer has published an invalid consensus message.
*/
}
AttnError::BadTargetEpoch => {
/*
* The aggregator index (or similar field) was higher than the maximum
* possible number of validators.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::NoCommitteeForSlotAndIndex { .. } => {
/*
* It is not possible to attest this the given committee in the given slot.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::NotExactlyOneAggregationBitSet(_) => {
/*
* The unaggregated attestation doesn't have only one signature.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::AttestsToFutureBlock { .. } => {
/*
* The beacon_block_root is from a higher slot than the attestation.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::InvalidSubnetId { received, expected } => {
/*
* The attestation was received on an incorrect subnet id.
*/
debug!(
self.log,
"Received attestation on incorrect subnet";
"expected" => format!("{:?}", expected),
"received" => format!("{:?}", received),
)
}
AttnError::Invalid(_) => {
/*
* The attestation failed the state_processing verification.
*
* The peer has published an invalid consensus message.
*/
}
AttnError::TooManySkippedSlots {
head_block_slot,
attestation_slot,
} => {
/*
* The attestation references a head block that is too far behind the attestation slot.
*
* The message is not necessarily invalid, but we choose to ignore it.
*/
debug!(
self.log,
"Rejected long skip slot attestation";
"head_block_slot" => head_block_slot,
"attestation_slot" => attestation_slot,
)
}
AttnError::BeaconChainError(e) => {
/*
* Lighthouse hit an unexpected error whilst processing the attestation. It
* should be impossible to trigger a `BeaconChainError` from the network,
* so we have a bug.
*
* It's not clear if the message is invalid/malicious.
*/
error!(
self.log,
"Unable to validate aggregate";
"peer_id" => peer_id.to_string(),
"error" => format!("{:?}", e),
);
}
}
debug!(
self.log,
"Invalid attestation from network";
"reason" => format!("{:?}", error),
"block" => format!("{}", beacon_block_root),
"peer_id" => peer_id.to_string(),
"type" => format!("{:?}", attestation_type),
);
}
}

View File

@@ -98,6 +98,49 @@ lazy_static! {
"beacon_processor_gossip_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc."
);
// Gossip Exits.
pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_exit_queue_total",
"Count of exits from gossip waiting to be verified."
);
pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_exit_verified_total",
"Total number of voluntary exits verified for propagation."
);
pub static ref BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_exit_imported_total",
"Total number of voluntary exits imported to the op pool."
);
// Gossip proposer slashings.
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_proposer_slashing_queue_total",
"Count of proposer slashings from gossip waiting to be verified."
);
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_proposer_slashing_verified_total",
"Total number of proposer slashings verified for propagation."
);
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_proposer_slashing_imported_total",
"Total number of proposer slashings imported to the op pool."
);
// Gossip attester slashings.
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_attester_slashing_queue_total",
"Count of attester slashings from gossip waiting to be verified."
);
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_attester_slashing_verified_total",
"Total number of attester slashings verified for propagation."
);
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_attester_slashing_imported_total",
"Total number of attester slashings imported to the op pool."
);
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_attester_slashing_error_total",
"Total number of attester slashings that raised an error during processing."
);
// Rpc blocks.
pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_block_queue_total",

View File

@@ -16,7 +16,7 @@ use eth2_libp2p::{
};
use futures::prelude::*;
use processor::Processor;
use slog::{debug, o, trace, warn};
use slog::{debug, o, trace};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@@ -26,8 +26,6 @@ use types::EthSpec;
/// passing them to the internal message processor. The message processor spawns a syncing thread
/// which manages which blocks need to be requested and processed.
pub struct Router<T: BeaconChainTypes> {
/// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
/// Access to the peer db for logging.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// Processes validated and decoded messages from the network. Has direct access to the
@@ -89,13 +87,12 @@ impl<T: BeaconChainTypes> Router<T> {
executor.clone(),
beacon_chain,
network_globals.clone(),
network_send.clone(),
network_send,
&log,
);
// generate the Message handler
let mut handler = Router {
network_send,
network_globals,
processor,
log: message_handler_log,
@@ -232,13 +229,7 @@ impl<T: BeaconChainTypes> Router<T> {
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id));
if let Some(verified_exit) = self
.processor
.verify_voluntary_exit_for_gossip(&peer_id, *exit)
{
self.propagate_message(id, peer_id);
self.processor.import_verified_voluntary_exit(verified_exit);
}
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);
}
PubsubMessage::ProposerSlashing(proposer_slashing) => {
debug!(
@@ -246,14 +237,8 @@ impl<T: BeaconChainTypes> Router<T> {
"Received a proposer slashing";
"peer_id" => format!("{}", peer_id)
);
if let Some(verified_proposer_slashing) = self
.processor
.verify_proposer_slashing_for_gossip(&peer_id, *proposer_slashing)
{
self.propagate_message(id, peer_id);
self.processor
.import_verified_proposer_slashing(verified_proposer_slashing);
}
self.processor
.on_proposer_slashing_gossip(id, peer_id, proposer_slashing);
}
PubsubMessage::AttesterSlashing(attester_slashing) => {
debug!(
@@ -261,30 +246,9 @@ impl<T: BeaconChainTypes> Router<T> {
"Received a attester slashing";
"peer_id" => format!("{}", peer_id)
);
if let Some(verified_attester_slashing) = self
.processor
.verify_attester_slashing_for_gossip(&peer_id, *attester_slashing)
{
self.propagate_message(id, peer_id);
self.processor
.import_verified_attester_slashing(verified_attester_slashing);
}
self.processor
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
}
}
}
/// Informs the network service that the message should be forwarded to other peers (is valid).
fn propagate_message(&mut self, message_id: MessageId, propagation_source: PeerId) {
self.network_send
.send(NetworkMessage::Validate {
propagation_source,
message_id,
})
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send propagation request to the network service"
)
});
}
}

View File

@@ -3,14 +3,13 @@ use crate::beacon_processor::{
};
use crate::service::NetworkMessage;
use crate::sync::{PeerSyncInfo, SyncMessage};
use beacon_chain::{observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{
MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response,
};
use itertools::process_results;
use slog::{debug, error, o, trace, warn};
use state_processing::SigVerifiedOp;
use std::cmp;
use std::sync::Arc;
use tokio::sync::mpsc;
@@ -585,140 +584,70 @@ impl<T: BeaconChainTypes> Processor<T> {
})
}
/// Verify a voluntary exit before gossiping or processing it.
///
/// Errors are logged at debug level.
pub fn verify_voluntary_exit_for_gossip(
&self,
peer_id: &PeerId,
voluntary_exit: SignedVoluntaryExit,
) -> Option<SigVerifiedOp<SignedVoluntaryExit>> {
let validator_index = voluntary_exit.message.validator_index;
match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
Ok(ObservationOutcome::New(sig_verified_exit)) => Some(sig_verified_exit),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping exit for already exiting validator";
"validator_index" => validator_index,
"peer" => peer_id.to_string()
);
None
}
Err(e) => {
debug!(
self.log,
"Dropping invalid exit";
"validator_index" => validator_index,
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
None
}
}
}
/// Import a verified exit into the op pool.
pub fn import_verified_voluntary_exit(
&self,
verified_voluntary_exit: SigVerifiedOp<SignedVoluntaryExit>,
pub fn on_voluntary_exit_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
voluntary_exit: Box<SignedVoluntaryExit>,
) {
self.chain.import_voluntary_exit(verified_voluntary_exit);
debug!(self.log, "Successfully imported voluntary exit");
self.beacon_processor_send
.try_send(BeaconWorkEvent::gossip_voluntary_exit(
message_id,
peer_id,
voluntary_exit,
))
.unwrap_or_else(|e| {
error!(
&self.log,
"Unable to send to gossip processor";
"type" => "voluntary exit gossip",
"error" => e.to_string(),
)
})
}
/// Verify a proposer slashing before gossiping or processing it.
///
/// Errors are logged at debug level.
pub fn verify_proposer_slashing_for_gossip(
&self,
peer_id: &PeerId,
proposer_slashing: ProposerSlashing,
) -> Option<SigVerifiedOp<ProposerSlashing>> {
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;
match self
.chain
.verify_proposer_slashing_for_gossip(proposer_slashing)
{
Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping proposer slashing";
"reason" => "Already seen a proposer slashing for that validator",
"validator_index" => validator_index,
"peer" => peer_id.to_string()
);
None
}
Err(e) => {
debug!(
self.log,
"Dropping invalid proposer slashing";
"validator_index" => validator_index,
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
None
}
}
}
/// Import a verified proposer slashing into the op pool.
pub fn import_verified_proposer_slashing(
&self,
proposer_slashing: SigVerifiedOp<ProposerSlashing>,
pub fn on_proposer_slashing_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: Box<ProposerSlashing>,
) {
self.chain.import_proposer_slashing(proposer_slashing);
debug!(self.log, "Successfully imported proposer slashing");
self.beacon_processor_send
.try_send(BeaconWorkEvent::gossip_proposer_slashing(
message_id,
peer_id,
proposer_slashing,
))
.unwrap_or_else(|e| {
error!(
&self.log,
"Unable to send to gossip processor";
"type" => "proposer slashing gossip",
"error" => e.to_string(),
)
})
}
/// Verify an attester slashing before gossiping or processing it.
///
/// Errors are logged at debug level.
pub fn verify_attester_slashing_for_gossip(
&self,
peer_id: &PeerId,
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Option<SigVerifiedOp<AttesterSlashing<T::EthSpec>>> {
match self
.chain
.verify_attester_slashing_for_gossip(attester_slashing)
{
Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
self.log,
"Dropping attester slashing";
"reason" => "Slashings already known for all slashed validators",
"peer" => peer_id.to_string()
);
None
}
Err(e) => {
debug!(
self.log,
"Dropping invalid attester slashing";
"peer" => peer_id.to_string(),
"error" => format!("{:?}", e)
);
None
}
}
}
/// Import a verified attester slashing into the op pool.
pub fn import_verified_attester_slashing(
&self,
attester_slashing: SigVerifiedOp<AttesterSlashing<T::EthSpec>>,
pub fn on_attester_slashing_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
) {
if let Err(e) = self.chain.import_attester_slashing(attester_slashing) {
debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e));
} else {
debug!(self.log, "Successfully imported attester slashing");
}
self.beacon_processor_send
.try_send(BeaconWorkEvent::gossip_attester_slashing(
message_id,
peer_id,
attester_slashing,
))
.unwrap_or_else(|e| {
error!(
&self.log,
"Unable to send to gossip processor";
"type" => "attester slashing gossip",
"error" => e.to_string(),
)
})
}
}

View File

@@ -169,6 +169,7 @@ fn spawn_service<T: BeaconChainTypes>(
mut service: NetworkService<T>,
) -> error::Result<()> {
let mut exit_rx = executor.exit();
let mut shutdown_sender = executor.shutdown_sender();
// spawn on the current executor
executor.spawn_without_exit(async move {
@@ -271,8 +272,8 @@ fn spawn_service<T: BeaconChainTypes>(
AttServiceMessage::EnrRemove(subnet_id) => {
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
}
AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => {
service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl);
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
service.libp2p.swarm.discover_subnet_peers(subnets_to_discover);
}
}
}
@@ -376,6 +377,12 @@ fn spawn_service<T: BeaconChainTypes>(
Libp2pEvent::NewListenAddr(multiaddr) => {
service.network_globals.listen_multiaddrs.write().push(multiaddr);
}
Libp2pEvent::ZeroListeners => {
let _ = shutdown_sender.send("All listeners are closed. Unable to listen").await.map_err(|e| {
warn!(service.log, "failed to send a shutdown signal"; "error" => e.to_string()
)
});
}
}
}
}

View File

@@ -40,7 +40,13 @@ mod tests {
let runtime = Runtime::new().unwrap();
let (signal, exit) = exit_future::signal();
let executor = environment::TaskExecutor::new(runtime.handle().clone(), exit, log.clone());
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = environment::TaskExecutor::new(
runtime.handle().clone(),
exit,
log.clone(),
shutdown_tx,
);
let mut config = NetworkConfig::default();
config.libp2p_port = 21212;

View File

@@ -110,7 +110,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
}
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) {
debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action"=> action.to_string());
debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action" => action.to_string());
self.network_send
.send(NetworkMessage::ReportPeer { peer_id, action })
.unwrap_or_else(|_| {

View File

@@ -1,6 +1,6 @@
[package]
name = "boot_node"
version = "0.2.5"
version = "0.2.6"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

View File

@@ -10,7 +10,7 @@ use target_info::Target;
/// `Lighthouse/v0.2.0-1419501f2+`
pub const VERSION: &str = git_version!(
args = ["--always", "--dirty=+"],
prefix = "Lighthouse/v0.2.5-",
prefix = "Lighthouse/v0.2.6-",
fallback = "unknown"
);

View File

@@ -1,7 +1,7 @@
[package]
name = "lcli"
description = "Lighthouse CLI (modeled after zcli)"
version = "0.2.5"
version = "0.2.6"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
[package]
name = "lighthouse"
version = "0.2.5"
version = "0.2.6"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

View File

@@ -1,4 +1,5 @@
use crate::metrics;
use futures::channel::mpsc::Sender;
use futures::prelude::*;
use slog::{debug, trace};
use tokio::runtime::Handle;
@@ -10,6 +11,12 @@ pub struct TaskExecutor {
pub(crate) handle: Handle,
/// The receiver exit future which on receiving shuts down the task
pub(crate) exit: exit_future::Exit,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
///
/// The task must provide a reason for shutting down.
pub(crate) signal_tx: Sender<&'static str>,
pub(crate) log: slog::Logger,
}
@@ -18,8 +25,18 @@ impl TaskExecutor {
///
/// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from
/// a [`RuntimeContext`](struct.RuntimeContext.html)
pub fn new(handle: Handle, exit: exit_future::Exit, log: slog::Logger) -> Self {
Self { handle, exit, log }
pub fn new(
handle: Handle,
exit: exit_future::Exit,
log: slog::Logger,
signal_tx: Sender<&'static str>,
) -> Self {
Self {
handle,
exit,
signal_tx,
log,
}
}
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled
@@ -51,7 +68,7 @@ impl TaskExecutor {
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
/// like [spawn](#method.spawn).
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
/// ensure that the task gets canceled appropriately.
/// This function generates prometheus metrics on number of tasks and task duration.
///
@@ -121,6 +138,11 @@ impl TaskExecutor {
self.exit.clone()
}
/// Get a channel to request shutting down.
pub fn shutdown_sender(&self) -> Sender<&'static str> {
self.signal_tx.clone()
}
/// Returns a reference to the logger.
pub fn log(&self) -> &slog::Logger {
&self.log

View File

@@ -9,7 +9,11 @@
use eth2_config::Eth2Config;
use eth2_testnet_config::Eth2TestnetConfig;
use futures::channel::oneshot;
use futures::channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
};
use futures::{future, StreamExt};
pub use executor::TaskExecutor;
use slog::{info, o, Drain, Level, Logger};
@@ -260,10 +264,13 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
/// Consumes the builder, returning an `Environment`.
pub fn build(self) -> Result<Environment<E>, String> {
let (signal, exit) = exit_future::signal();
let (signal_tx, signal_rx) = channel(1);
Ok(Environment {
runtime: self
.runtime
.ok_or_else(|| "Cannot build environment without runtime".to_string())?,
signal_tx,
signal_rx: Some(signal_rx),
signal: Some(signal),
exit,
log: self
@@ -295,6 +302,7 @@ impl<E: EthSpec> RuntimeContext<E> {
Self {
executor: TaskExecutor {
handle: self.executor.handle.clone(),
signal_tx: self.executor.signal_tx.clone(),
exit: self.executor.exit.clone(),
log: self.executor.log.new(o!("service" => service_name)),
},
@@ -318,6 +326,10 @@ impl<E: EthSpec> RuntimeContext<E> {
/// validator client, or to run tests that involve logging and async task execution.
pub struct Environment<E: EthSpec> {
runtime: Runtime,
/// Receiver side of an internal shutdown signal.
signal_rx: Option<Receiver<&'static str>>,
/// Sender to request shutting down.
signal_tx: Sender<&'static str>,
signal: Option<exit_future::Signal>,
exit: exit_future::Exit,
log: Logger,
@@ -340,6 +352,7 @@ impl<E: EthSpec> Environment<E> {
RuntimeContext {
executor: TaskExecutor {
exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(),
handle: self.runtime().handle().clone(),
log: self.log.clone(),
},
@@ -353,6 +366,7 @@ impl<E: EthSpec> Environment<E> {
RuntimeContext {
executor: TaskExecutor {
exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(),
handle: self.runtime().handle().clone(),
log: self.log.new(o!("service" => service_name)),
},
@@ -361,8 +375,20 @@ impl<E: EthSpec> Environment<E> {
}
}
/// Block the current thread until Ctrl+C is received.
pub fn block_until_ctrl_c(&mut self) -> Result<(), String> {
/// Block the current thread until a shutdown signal is received.
///
/// This can be either the user Ctrl-C'ing or a task requesting to shutdown.
pub fn block_until_shutdown_requested(&mut self) -> Result<(), String> {
// future of a task requesting to shutdown
let mut rx = self
.signal_rx
.take()
.ok_or("Inner shutdown already received")?;
let inner_shutdown =
async move { rx.next().await.ok_or("Internal shutdown channel exhausted") };
futures::pin_mut!(inner_shutdown);
// setup for handling a Ctrl-C
let (ctrlc_send, ctrlc_oneshot) = oneshot::channel();
let ctrlc_send_c = RefCell::new(Some(ctrlc_send));
ctrlc::set_handler(move || {
@@ -372,10 +398,18 @@ impl<E: EthSpec> Environment<E> {
})
.map_err(|e| format!("Could not set ctrlc handler: {:?}", e))?;
// Block this thread until Crtl+C is pressed.
self.runtime()
.block_on(ctrlc_oneshot)
.map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))
// Block this thread until a shutdown signal is received.
match self
.runtime()
.block_on(future::select(inner_shutdown, ctrlc_oneshot))
{
future::Either::Left((Ok(reason), _)) => {
info!(self.log, "Internal shutdown received"; "reason" => reason);
Ok(())
}
future::Either::Left((Err(e), _)) => Err(e.into()),
future::Either::Right((x, _)) => x.map_err(|e| format!("Ctrlc oneshot failed: {}", e)),
}
}
/// Shutdown the `tokio` runtime when all tasks are idle.

View File

@@ -300,8 +300,8 @@ fn run<E: EthSpec>(
return Err("No subcommand supplied.".into());
}
// Block this thread until Crtl+C is pressed.
environment.block_until_ctrl_c()?;
// Block this thread until we get a ctrl-c or a task sends a shutdown signal.
environment.block_until_shutdown_requested()?;
info!(log, "Shutting down..");
environment.fire_signal();

View File

@@ -16,5 +16,6 @@ exec lighthouse \
--testnet-dir $TESTNET_DIR \
--dummy-eth1 \
--http \
--port 9902 \
--http-port 6052 \
--boot-nodes $(cat $BEACON_DIR/beacon/network/enr.dat)

View File

@@ -1,6 +1,6 @@
[package]
name = "validator_client"
version = "0.2.5"
version = "0.2.6"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>", "Luke Anderson <luke@lukeanderson.com.au>"]
edition = "2018"