mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-04 09:11:42 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ebb25b5569 | ||
|
|
bbed42f30c | ||
|
|
fdc6e2aa8e | ||
|
|
8e7dd7b2b1 |
17
Cargo.lock
generated
17
Cargo.lock
generated
@@ -2,7 +2,7 @@
|
||||
# It is not intended for manual editing.
|
||||
[[package]]
|
||||
name = "account_manager"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"bls",
|
||||
@@ -373,7 +373,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "beacon_node"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"beacon_chain",
|
||||
"clap",
|
||||
@@ -530,7 +530,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "boot_node"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"discv5",
|
||||
@@ -1699,9 +1699,9 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.16"
|
||||
version = "1.0.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e"
|
||||
checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crc32fast",
|
||||
@@ -2523,7 +2523,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "lcli"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"bls",
|
||||
"clap",
|
||||
@@ -2874,14 +2874,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af67924b8dd885cccea261866c8ce5b74d239d272e154053ff927dae839f5ae9"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lighthouse"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"account_manager",
|
||||
"account_utils",
|
||||
@@ -6017,7 +6016,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "validator_client"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"bls",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "account_manager"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Paul Hauner <paul@paulhauner.com>", "Luke Anderson <luke@sigmaprime.io>"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "beacon_node"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
|
||||
use crate::rpc::*;
|
||||
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic};
|
||||
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
|
||||
use crate::Eth2Enr;
|
||||
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use futures::prelude::*;
|
||||
@@ -29,7 +29,6 @@ use std::{
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::Instant,
|
||||
};
|
||||
use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId};
|
||||
|
||||
@@ -301,8 +300,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
|
||||
/// would like to retain the peers for.
|
||||
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
|
||||
self.peer_manager.discover_subnet_peers(subnet_id, min_ttl)
|
||||
pub fn discover_subnet_peers(&mut self, subnet_subscriptions: Vec<SubnetDiscovery>) {
|
||||
self.peer_manager
|
||||
.discover_subnet_peers(subnet_subscriptions)
|
||||
}
|
||||
|
||||
/// Updates the local ENR's "eth2" field with the latest EnrForkId.
|
||||
|
||||
@@ -8,7 +8,7 @@ pub use enr_ext::{CombinedKeyExt, EnrExt};
|
||||
pub use libp2p::core::identity::Keypair;
|
||||
|
||||
use crate::metrics;
|
||||
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
|
||||
use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery};
|
||||
use discv5::{enr::NodeId, Discv5, Discv5Event};
|
||||
use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY};
|
||||
use futures::prelude::*;
|
||||
@@ -305,12 +305,19 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
}
|
||||
|
||||
/// Processes a request to search for more peers on a subnet.
|
||||
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
|
||||
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
|
||||
// If the discv5 service isn't running, ignore queries
|
||||
if !self.started {
|
||||
return;
|
||||
}
|
||||
self.add_subnet_query(subnet_id, min_ttl, 0);
|
||||
debug!(
|
||||
self.log,
|
||||
"Making discovery query for subnets";
|
||||
"subnets" => format!("{:?}", subnets_to_discover.iter().map(|s| s.subnet_id).collect::<Vec<_>>())
|
||||
);
|
||||
for subnet in subnets_to_discover {
|
||||
self.add_subnet_query(subnet.subnet_id, subnet.min_ttl, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add an ENR to the routing table of the discovery mechanism.
|
||||
@@ -514,6 +521,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
// This query is for searching for peers of a particular subnet
|
||||
// Drain subnet_queries so we can re-use it as we continue to process the queue
|
||||
let grouped_queries: Vec<SubnetQuery> = subnet_queries.drain(..).collect();
|
||||
debug!(
|
||||
self.log,
|
||||
"Starting grouped subnet query";
|
||||
"subnets" => format!("{:?}", grouped_queries.iter().map(|q| q.subnet_id).collect::<Vec<_>>()),
|
||||
);
|
||||
self.start_subnet_query(grouped_queries);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
///! The subnet predicate used for searching for a particular subnet.
|
||||
use super::*;
|
||||
use slog::{debug, trace};
|
||||
use std::ops::Deref;
|
||||
|
||||
/// Returns the predicate for a given subnet.
|
||||
@@ -30,7 +31,7 @@ where
|
||||
.collect();
|
||||
|
||||
if matches.is_empty() {
|
||||
debug!(
|
||||
trace!(
|
||||
log_clone,
|
||||
"Peer found but not on any of the desired subnets";
|
||||
"peer_id" => format!("{}", enr.peer_id())
|
||||
|
||||
@@ -14,7 +14,7 @@ pub mod rpc;
|
||||
mod service;
|
||||
pub mod types;
|
||||
|
||||
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage};
|
||||
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
|
||||
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
|
||||
pub use config::Config as NetworkConfig;
|
||||
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
|
||||
|
||||
@@ -4,7 +4,7 @@ pub use self::peerdb::*;
|
||||
use crate::discovery::{Discovery, DiscoveryEvent};
|
||||
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
|
||||
use crate::{error, metrics};
|
||||
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId};
|
||||
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
|
||||
use futures::prelude::*;
|
||||
use futures::Stream;
|
||||
use hashset_delay::HashSetDelay;
|
||||
@@ -19,7 +19,7 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use types::{EthSpec, SubnetId};
|
||||
use types::EthSpec;
|
||||
|
||||
pub use libp2p::core::{identity::Keypair, Multiaddr};
|
||||
|
||||
@@ -213,17 +213,19 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
}
|
||||
|
||||
/// A request to find peers on a given subnet.
|
||||
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
|
||||
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
|
||||
// Extend the time to maintain peers if required.
|
||||
if let Some(min_ttl) = min_ttl {
|
||||
self.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.extend_peers_on_subnet(subnet_id, min_ttl);
|
||||
for s in subnets_to_discover.iter() {
|
||||
if let Some(min_ttl) = s.min_ttl {
|
||||
self.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.extend_peers_on_subnet(s.subnet_id, min_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
// request the subnet query from discovery
|
||||
self.discovery.discover_subnet_peers(subnet_id, min_ttl);
|
||||
self.discovery.discover_subnet_peers(subnets_to_discover);
|
||||
}
|
||||
|
||||
/// A STATUS message has been received from a peer. This resets the status timer.
|
||||
|
||||
@@ -36,6 +36,8 @@ pub enum Libp2pEvent<TSpec: EthSpec> {
|
||||
Behaviour(BehaviourEvent<TSpec>),
|
||||
/// A new listening address has been established.
|
||||
NewListenAddr(Multiaddr),
|
||||
/// We reached zero listening addresses.
|
||||
ZeroListeners,
|
||||
}
|
||||
|
||||
/// The configuration and state of the libp2p components for the beacon node.
|
||||
@@ -283,10 +285,17 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string())
|
||||
}
|
||||
SwarmEvent::ListenerClosed { addresses, reason } => {
|
||||
crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason))
|
||||
crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason));
|
||||
if Swarm::listeners(&self.swarm).count() == 0 {
|
||||
return Libp2pEvent::ZeroListeners;
|
||||
}
|
||||
}
|
||||
SwarmEvent::ListenerError { error } => {
|
||||
warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()))
|
||||
// this is non fatal, but we still check
|
||||
warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()));
|
||||
if Swarm::listeners(&self.swarm).count() == 0 {
|
||||
return Libp2pEvent::ZeroListeners;
|
||||
}
|
||||
}
|
||||
SwarmEvent::Dialing(peer_id) => {
|
||||
debug!(self.log, "Dialing peer"; "peer_id" => peer_id.to_string());
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod error;
|
||||
mod globals;
|
||||
mod pubsub;
|
||||
mod subnet;
|
||||
mod sync_state;
|
||||
mod topics;
|
||||
|
||||
@@ -13,5 +14,6 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
|
||||
|
||||
pub use globals::NetworkGlobals;
|
||||
pub use pubsub::PubsubMessage;
|
||||
pub use subnet::SubnetDiscovery;
|
||||
pub use sync_state::SyncState;
|
||||
pub use topics::{GossipEncoding, GossipKind, GossipTopic};
|
||||
|
||||
28
beacon_node/eth2_libp2p/src/types/subnet.rs
Normal file
28
beacon_node/eth2_libp2p/src/types/subnet.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
use std::time::{Duration, Instant};
|
||||
use types::SubnetId;
|
||||
|
||||
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
|
||||
|
||||
/// A subnet to discover peers on along with the instant after which it's no longer useful.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SubnetDiscovery {
|
||||
pub subnet_id: SubnetId,
|
||||
pub min_ttl: Option<Instant>,
|
||||
}
|
||||
|
||||
impl PartialEq for SubnetDiscovery {
|
||||
fn eq(&self, other: &SubnetDiscovery) -> bool {
|
||||
self.subnet_id == other.subnet_id
|
||||
&& match (self.min_ttl, other.min_ttl) {
|
||||
(Some(min_ttl_instant), Some(other_min_ttl_instant)) => {
|
||||
min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
|
||||
< DURATION_DIFFERENCE
|
||||
&& other_min_ttl_instant.saturating_duration_since(min_ttl_instant)
|
||||
< DURATION_DIFFERENCE
|
||||
}
|
||||
(None, None) => true,
|
||||
(None, Some(_)) => true,
|
||||
(Some(_), None) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -94,8 +94,13 @@ pub async fn build_libp2p_instance(boot_nodes: Vec<Enr>, log: slog::Logger) -> L
|
||||
// launch libp2p service
|
||||
|
||||
let (signal, exit) = exit_future::signal();
|
||||
let executor =
|
||||
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = environment::TaskExecutor::new(
|
||||
tokio::runtime::Handle::current(),
|
||||
exit,
|
||||
log.clone(),
|
||||
shutdown_tx,
|
||||
);
|
||||
Libp2pInstance(
|
||||
LibP2PService::new(executor, &config, EnrForkId::default(), &log)
|
||||
.await
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
|
||||
//! determines whether attestations should be aggregated and/or passed to the beacon node.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -13,7 +13,7 @@ use rand::seq::SliceRandom;
|
||||
use slog::{crit, debug, error, o, trace, warn};
|
||||
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::{types::GossipKind, NetworkGlobals};
|
||||
use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery};
|
||||
use hashset_delay::HashSetDelay;
|
||||
use rest_types::ValidatorSubscription;
|
||||
use slot_clock::SlotClock;
|
||||
@@ -25,11 +25,8 @@ mod tests;
|
||||
|
||||
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
|
||||
/// slot is less than this number, skip the peer discovery process.
|
||||
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 1;
|
||||
/// The number of slots ahead that we attempt to discover peers for a subscription. If the slot to
|
||||
/// attest to is greater than this, we queue a discovery request for this many slots prior to
|
||||
/// subscribing.
|
||||
const TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 6;
|
||||
/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s.
|
||||
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
|
||||
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
|
||||
/// gossip topics that we subscribed to due to the validator connection.
|
||||
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
|
||||
@@ -39,12 +36,10 @@ const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
|
||||
/// Note: The time is calculated as `time = milliseconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`.
|
||||
const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
|
||||
/// The default number of slots before items in hash delay sets used by this class should expire.
|
||||
/// 36s at 12s slot time
|
||||
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
|
||||
// 36s at 12s slot time
|
||||
/// The duration difference between two instance for them to be considered equal.
|
||||
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
|
||||
|
||||
#[derive(Debug, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum AttServiceMessage {
|
||||
/// Subscribe to the specified subnet id.
|
||||
Subscribe(SubnetId),
|
||||
@@ -54,44 +49,8 @@ pub enum AttServiceMessage {
|
||||
EnrAdd(SubnetId),
|
||||
/// Remove the `SubnetId` from the ENR bitfield.
|
||||
EnrRemove(SubnetId),
|
||||
/// Discover peers for a particular subnet.
|
||||
/// The includes the `Instant` we need the discovered peer until.
|
||||
DiscoverPeers {
|
||||
subnet_id: SubnetId,
|
||||
min_ttl: Option<Instant>,
|
||||
},
|
||||
}
|
||||
|
||||
impl PartialEq for AttServiceMessage {
|
||||
fn eq(&self, other: &AttServiceMessage) -> bool {
|
||||
match (self, other) {
|
||||
(&AttServiceMessage::Subscribe(a), &AttServiceMessage::Subscribe(b)) => a == b,
|
||||
(&AttServiceMessage::Unsubscribe(a), &AttServiceMessage::Unsubscribe(b)) => a == b,
|
||||
(&AttServiceMessage::EnrAdd(a), &AttServiceMessage::EnrAdd(b)) => a == b,
|
||||
(&AttServiceMessage::EnrRemove(a), &AttServiceMessage::EnrRemove(b)) => a == b,
|
||||
(
|
||||
&AttServiceMessage::DiscoverPeers { subnet_id, min_ttl },
|
||||
&AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: other_subnet_id,
|
||||
min_ttl: other_min_ttl,
|
||||
},
|
||||
) => {
|
||||
subnet_id == other_subnet_id
|
||||
&& match (min_ttl, other_min_ttl) {
|
||||
(Some(min_ttl_instant), Some(other_min_ttl_instant)) => {
|
||||
min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
|
||||
< DURATION_DIFFERENCE
|
||||
&& other_min_ttl_instant.saturating_duration_since(min_ttl_instant)
|
||||
< DURATION_DIFFERENCE
|
||||
}
|
||||
(None, None) => true,
|
||||
(None, Some(_)) => true,
|
||||
(Some(_), None) => true,
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
/// Discover peers for a list of `SubnetDiscovery`.
|
||||
DiscoverPeers(Vec<SubnetDiscovery>),
|
||||
}
|
||||
|
||||
/// A particular subnet at a given slot.
|
||||
@@ -116,9 +75,6 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
|
||||
random_subnets: HashSetDelay<SubnetId>,
|
||||
|
||||
/// A collection of timeouts for when to start searching for peers for a particular shard.
|
||||
discover_peers: HashSetDelay<ExactSubnet>,
|
||||
|
||||
/// A collection of timeouts for when to subscribe to a shard subnet.
|
||||
subscriptions: HashSetDelay<ExactSubnet>,
|
||||
|
||||
@@ -172,7 +128,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
network_globals,
|
||||
beacon_chain,
|
||||
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
|
||||
discover_peers: HashSetDelay::new(default_timeout),
|
||||
subscriptions: HashSetDelay::new(default_timeout),
|
||||
unsubscriptions: HashSetDelay::new(default_timeout),
|
||||
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
|
||||
@@ -198,6 +153,8 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
&mut self,
|
||||
subscriptions: Vec<ValidatorSubscription>,
|
||||
) -> Result<(), String> {
|
||||
// Maps each subnet_id subscription to it's highest slot
|
||||
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
|
||||
for subscription in subscriptions {
|
||||
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
|
||||
//NOTE: We assume all subscriptions have been verified before reaching this service
|
||||
@@ -226,15 +183,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Ensure each subnet_id inserted into the map has the highest slot as it's value.
|
||||
// Higher slot corresponds to higher min_ttl in the `SubnetDiscovery` entry.
|
||||
if let Some(slot) = subnets_to_discover.get(&subnet_id) {
|
||||
if subscription.slot > *slot {
|
||||
subnets_to_discover.insert(subnet_id, subscription.slot);
|
||||
}
|
||||
} else {
|
||||
subnets_to_discover.insert(subnet_id, subscription.slot);
|
||||
}
|
||||
|
||||
let exact_subnet = ExactSubnet {
|
||||
subnet_id,
|
||||
slot: subscription.slot,
|
||||
};
|
||||
// determine if we should run a discovery lookup request and request it if required
|
||||
if let Err(e) = self.discover_peers_request(exact_subnet.clone()) {
|
||||
warn!(self.log, "Discovery lookup request error"; "error" => e);
|
||||
}
|
||||
|
||||
// determine if the validator is an aggregator. If so, we subscribe to the subnet and
|
||||
// if successful add the validator to a mapping of known aggregators for that exact
|
||||
@@ -264,6 +226,14 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.discover_peers_request(
|
||||
subnets_to_discover
|
||||
.into_iter()
|
||||
.map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }),
|
||||
) {
|
||||
warn!(self.log, "Discovery lookup request error"; "error" => e);
|
||||
};
|
||||
|
||||
// pre-emptively wake the thread to check for new events
|
||||
if let Some(waker) = &self.waker {
|
||||
waker.wake_by_ref();
|
||||
@@ -290,114 +260,55 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// Checks if there are currently queued discovery requests and the time required to make the
|
||||
/// request.
|
||||
///
|
||||
/// If there is sufficient time and no other request exists, queues a peer discovery request
|
||||
/// for the required subnet.
|
||||
fn discover_peers_request(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
|
||||
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
|
||||
fn discover_peers_request(
|
||||
&mut self,
|
||||
exact_subnets: impl Iterator<Item = ExactSubnet>,
|
||||
) -> Result<(), &'static str> {
|
||||
let current_slot = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.now()
|
||||
.ok_or_else(|| "Could not get the current slot")?;
|
||||
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
|
||||
|
||||
// if there is enough time to perform a discovery lookup
|
||||
if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
|
||||
// check if a discovery request already exists
|
||||
if self.discover_peers.get(&exact_subnet).is_some() {
|
||||
// already a request queued, end
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// if the slot is more than epoch away, add an event to start looking for peers
|
||||
if exact_subnet.slot
|
||||
< current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
|
||||
{
|
||||
// add one slot to ensure we keep the peer for the subscription slot
|
||||
let min_ttl = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(exact_subnet.slot + 1)
|
||||
.map(|duration| std::time::Instant::now() + duration);
|
||||
|
||||
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl);
|
||||
} else {
|
||||
// Queue the discovery event to be executed for
|
||||
// TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD
|
||||
|
||||
let duration_to_discover = {
|
||||
let duration_to_next_slot = self
|
||||
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
|
||||
.filter_map(|exact_subnet| {
|
||||
// check if there is enough time to perform a discovery lookup
|
||||
if exact_subnet.slot
|
||||
>= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
|
||||
{
|
||||
// if the slot is more than epoch away, add an event to start looking for peers
|
||||
// add one slot to ensure we keep the peer for the subscription slot
|
||||
let min_ttl = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_next_slot()
|
||||
.ok_or_else(|| "Unable to determine duration to next slot")?;
|
||||
// The -1 is done here to exclude the current slot duration, as we will use
|
||||
// `duration_to_next_slot`.
|
||||
let slots_until_discover = exact_subnet
|
||||
.slot
|
||||
.saturating_sub(current_slot)
|
||||
.saturating_sub(1u64)
|
||||
.saturating_sub(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD);
|
||||
.duration_to_slot(exact_subnet.slot + 1)
|
||||
.map(|duration| std::time::Instant::now() + duration);
|
||||
Some(SubnetDiscovery {
|
||||
subnet_id: exact_subnet.subnet_id,
|
||||
min_ttl,
|
||||
})
|
||||
} else {
|
||||
// TODO: Send the time frame needed to have a peer connected, so that we can
|
||||
// maintain peers for a least this duration.
|
||||
// We may want to check the global PeerInfo to see estimated timeouts for each
|
||||
// peer before they can be removed.
|
||||
warn!(self.log,
|
||||
"Not enough time for a discovery search";
|
||||
"subnet_id" => format!("{:?}", exact_subnet)
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
duration_to_next_slot + slot_duration * (slots_until_discover.as_u64() as u32)
|
||||
};
|
||||
|
||||
self.discover_peers
|
||||
.insert_at(exact_subnet, duration_to_discover);
|
||||
}
|
||||
} else {
|
||||
// TODO: Send the time frame needed to have a peer connected, so that we can
|
||||
// maintain peers for a least this duration.
|
||||
// We may want to check the global PeerInfo to see estimated timeouts for each
|
||||
// peer before they can be removed.
|
||||
return Err("Not enough time for a discovery search");
|
||||
if !discovery_subnets.is_empty() {
|
||||
self.events
|
||||
.push_back(AttServiceMessage::DiscoverPeers(discovery_subnets));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Checks if we have a discover peers event already and sends a new event if necessary
|
||||
///
|
||||
/// If a message exists for the same subnet, compare the `min_ttl` of the current and
|
||||
/// existing messages and extend the existing message as necessary.
|
||||
fn send_or_update_discovery_event(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
|
||||
// track whether this message already exists in the event queue
|
||||
let mut is_duplicate = false;
|
||||
|
||||
self.events.iter_mut().for_each(|event| {
|
||||
if let AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: other_subnet_id,
|
||||
min_ttl: other_min_ttl,
|
||||
} = event
|
||||
{
|
||||
if subnet_id == *other_subnet_id {
|
||||
let other_min_ttl_clone = *other_min_ttl;
|
||||
match (min_ttl, other_min_ttl_clone) {
|
||||
(Some(min_ttl_instant), Some(other_min_ttl_instant)) =>
|
||||
// only update the min_ttl if it is greater than the existing min_ttl and a DURATION_DIFFERENCE padding
|
||||
{
|
||||
if min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
|
||||
> DURATION_DIFFERENCE
|
||||
{
|
||||
*other_min_ttl = min_ttl;
|
||||
}
|
||||
}
|
||||
(None, Some(_)) => {} // Keep the current one as it has an actual min_ttl
|
||||
(Some(min_ttl), None) => {
|
||||
// Update the request to include a min_ttl.
|
||||
*other_min_ttl = Some(min_ttl);
|
||||
}
|
||||
(None, None) => {} // Duplicate message, do nothing.
|
||||
}
|
||||
is_duplicate = true;
|
||||
return;
|
||||
}
|
||||
};
|
||||
});
|
||||
if !is_duplicate {
|
||||
self.events
|
||||
.push_back(AttServiceMessage::DiscoverPeers { subnet_id, min_ttl });
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks the current random subnets and subscriptions to determine if a new subscription for this
|
||||
/// subnet is required for the given slot.
|
||||
///
|
||||
@@ -547,7 +458,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
if !already_subscribed {
|
||||
// send a discovery request and a subscription
|
||||
self.send_or_update_discovery_event(subnet_id, None);
|
||||
self.events
|
||||
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet_id,
|
||||
min_ttl: None,
|
||||
}]));
|
||||
self.events
|
||||
.push_back(AttServiceMessage::Subscribe(subnet_id));
|
||||
}
|
||||
@@ -558,20 +473,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
/* A collection of functions that handle the various timeouts */
|
||||
|
||||
/// Request a discovery query to find peers for a particular subnet.
|
||||
fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) {
|
||||
debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot);
|
||||
|
||||
// add one slot to ensure we keep the peer for the subscription slot
|
||||
let min_ttl = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(exact_subnet.slot + 1)
|
||||
.map(|duration| std::time::Instant::now() + duration);
|
||||
|
||||
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl)
|
||||
}
|
||||
|
||||
/// A queued subscription is ready.
|
||||
///
|
||||
/// We add subscriptions events even if we are already subscribed to a random subnet (as these
|
||||
@@ -731,15 +632,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
self.waker = Some(cx.waker().clone());
|
||||
}
|
||||
|
||||
// process any peer discovery events
|
||||
match self.discover_peers.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for peer discovery requests"; "error"=> e);
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
// process any subscription events
|
||||
match self.subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet),
|
||||
|
||||
@@ -8,7 +8,9 @@ mod tests {
|
||||
migrate::NullMigrator,
|
||||
};
|
||||
use eth2_libp2p::discovery::{build_enr, Keypair};
|
||||
use eth2_libp2p::{discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals};
|
||||
use eth2_libp2p::{
|
||||
discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery,
|
||||
};
|
||||
use futures::Stream;
|
||||
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
|
||||
use lazy_static::lazy_static;
|
||||
@@ -120,23 +122,21 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn _get_subscriptions(
|
||||
fn get_subscriptions(
|
||||
validator_count: u64,
|
||||
slot: Slot,
|
||||
committee_count_at_slot: u64,
|
||||
) -> Vec<ValidatorSubscription> {
|
||||
let mut subscriptions: Vec<ValidatorSubscription> = Vec::new();
|
||||
for validator_index in 0..validator_count {
|
||||
let is_aggregator = true;
|
||||
subscriptions.push(ValidatorSubscription {
|
||||
validator_index,
|
||||
attestation_committee_index: validator_index,
|
||||
slot,
|
||||
committee_count_at_slot,
|
||||
is_aggregator,
|
||||
});
|
||||
}
|
||||
subscriptions
|
||||
(0..validator_count)
|
||||
.map(|validator_index| {
|
||||
get_subscription(
|
||||
validator_index,
|
||||
validator_index,
|
||||
slot,
|
||||
committee_count_at_slot,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// gets a number of events from the subscription service, or returns none if it times out after a number
|
||||
@@ -210,14 +210,7 @@ mod tests {
|
||||
let events = get_events(attestation_service, no_events_expected, 1).await;
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any1),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -270,14 +263,7 @@ mod tests {
|
||||
let events = get_events(attestation_service, no_events_expected, 2).await;
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any1),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -330,19 +316,15 @@ mod tests {
|
||||
&attestation_service.beacon_chain.spec,
|
||||
)
|
||||
.unwrap();
|
||||
let expected = vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }];
|
||||
let expected = vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet_id,
|
||||
min_ttl,
|
||||
}])];
|
||||
|
||||
let events = get_events(attestation_service, no_events_expected, 1).await;
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any2),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -396,21 +378,14 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
let expected = vec![
|
||||
AttServiceMessage::DiscoverPeers { subnet_id, min_ttl },
|
||||
AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { subnet_id, min_ttl }]),
|
||||
AttServiceMessage::Subscribe(subnet_id),
|
||||
];
|
||||
|
||||
let events = get_events(attestation_service, no_events_expected, 5).await;
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any2),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -454,14 +429,7 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any2),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -517,20 +485,16 @@ mod tests {
|
||||
|
||||
// expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range
|
||||
let expected: Vec<AttServiceMessage> =
|
||||
vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }];
|
||||
vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet_id,
|
||||
min_ttl,
|
||||
}])];
|
||||
|
||||
let events = get_events(attestation_service, no_events_expected, 5).await;
|
||||
|
||||
assert_matches!(
|
||||
events[..3],
|
||||
[
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant
|
||||
},
|
||||
AttServiceMessage::Subscribe(_any2),
|
||||
AttServiceMessage::EnrAdd(_any3)
|
||||
]
|
||||
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)]
|
||||
);
|
||||
// if there are fewer events than expected, there's been a collision
|
||||
if events.len() == no_events_expected {
|
||||
@@ -553,7 +517,7 @@ mod tests {
|
||||
.now()
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let subscriptions = _get_subscriptions(
|
||||
let subscriptions = get_subscriptions(
|
||||
subscription_count,
|
||||
current_slot + subscription_slot,
|
||||
committee_count,
|
||||
@@ -572,10 +536,9 @@ mod tests {
|
||||
|
||||
for event in events {
|
||||
match event {
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant,
|
||||
} => discover_peer_count = discover_peer_count + 1,
|
||||
AttServiceMessage::DiscoverPeers(_) => {
|
||||
discover_peer_count = discover_peer_count + 1
|
||||
}
|
||||
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
|
||||
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
|
||||
_ => unexpected_msg_count = unexpected_msg_count + 1,
|
||||
@@ -605,7 +568,7 @@ mod tests {
|
||||
.now()
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let subscriptions = _get_subscriptions(
|
||||
let subscriptions = get_subscriptions(
|
||||
subscription_count,
|
||||
current_slot + subscription_slot,
|
||||
committee_count,
|
||||
@@ -624,10 +587,9 @@ mod tests {
|
||||
|
||||
for event in events {
|
||||
match event {
|
||||
AttServiceMessage::DiscoverPeers {
|
||||
subnet_id: _any_subnet,
|
||||
min_ttl: _any_instant,
|
||||
} => discover_peer_count = discover_peer_count + 1,
|
||||
AttServiceMessage::DiscoverPeers(_) => {
|
||||
discover_peer_count = discover_peer_count + 1
|
||||
}
|
||||
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
|
||||
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
|
||||
_ => unexpected_msg_count = unexpected_msg_count + 1,
|
||||
@@ -639,4 +601,40 @@ mod tests {
|
||||
assert_eq!(enr_add_count, 64);
|
||||
assert_eq!(unexpected_msg_count, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_discovery_peers_count() {
|
||||
let subscription_slot = 10;
|
||||
let validator_count = 32;
|
||||
let committee_count = 1;
|
||||
let expected_events = 97;
|
||||
|
||||
// create the attestation service and subscriptions
|
||||
let mut attestation_service = get_attestation_service();
|
||||
let current_slot = attestation_service
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.now()
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let subscriptions = get_subscriptions(
|
||||
validator_count,
|
||||
current_slot + subscription_slot,
|
||||
committee_count,
|
||||
);
|
||||
|
||||
// submit sthe subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.unwrap();
|
||||
|
||||
let events = get_events(attestation_service, expected_events, 3).await;
|
||||
|
||||
let event = events.get(96);
|
||||
if let Some(AttServiceMessage::DiscoverPeers(d)) = event {
|
||||
assert_eq!(d.len(), validator_count as usize);
|
||||
} else {
|
||||
panic!("Unexpected event {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,22 +36,22 @@
|
||||
//! task.
|
||||
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
use beacon_chain::{
|
||||
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
BlockError, ForkChoiceError,
|
||||
};
|
||||
use chain_segment::handle_chain_segment;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
|
||||
use environment::TaskExecutor;
|
||||
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
|
||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use ssz::Encode;
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
|
||||
};
|
||||
use worker::Worker;
|
||||
|
||||
mod chain_segment;
|
||||
mod worker;
|
||||
|
||||
pub use chain_segment::ProcessId;
|
||||
|
||||
@@ -78,6 +78,18 @@ const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024;
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096;
|
||||
|
||||
/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
|
||||
|
||||
/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
|
||||
|
||||
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
|
||||
/// will be stored before we start dropping them.
|
||||
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
|
||||
@@ -242,6 +254,54 @@ impl<E: EthSpec> WorkEvent<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some exit.
|
||||
pub fn gossip_voluntary_exit(
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
voluntary_exit: Box<SignedVoluntaryExit>,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::GossipVoluntaryExit {
|
||||
message_id,
|
||||
peer_id,
|
||||
voluntary_exit,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some proposer slashing.
|
||||
pub fn gossip_proposer_slashing(
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
proposer_slashing: Box<ProposerSlashing>,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::GossipProposerSlashing {
|
||||
message_id,
|
||||
peer_id,
|
||||
proposer_slashing,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some attester slashing.
|
||||
pub fn gossip_attester_slashing(
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attester_slashing: Box<AttesterSlashing<E>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::GossipAttesterSlashing {
|
||||
message_id,
|
||||
peer_id,
|
||||
attester_slashing,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some block, where the result from computation (if any) is
|
||||
/// sent to the other side of `result_tx`.
|
||||
pub fn rpc_beacon_block(block: Box<SignedBeaconBlock<E>>) -> (Self, BlockResultReceiver<E>) {
|
||||
@@ -282,6 +342,21 @@ pub enum Work<E: EthSpec> {
|
||||
peer_id: PeerId,
|
||||
block: Box<SignedBeaconBlock<E>>,
|
||||
},
|
||||
GossipVoluntaryExit {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
voluntary_exit: Box<SignedVoluntaryExit>,
|
||||
},
|
||||
GossipProposerSlashing {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
proposer_slashing: Box<ProposerSlashing>,
|
||||
},
|
||||
GossipAttesterSlashing {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attester_slashing: Box<AttesterSlashing<E>>,
|
||||
},
|
||||
RpcBlock {
|
||||
block: Box<SignedBeaconBlock<E>>,
|
||||
result_tx: BlockResultSender<E>,
|
||||
@@ -299,6 +374,9 @@ impl<E: EthSpec> Work<E> {
|
||||
Work::GossipAttestation { .. } => "gossip_attestation",
|
||||
Work::GossipAggregate { .. } => "gossip_aggregate",
|
||||
Work::GossipBlock { .. } => "gossip_block",
|
||||
Work::GossipVoluntaryExit { .. } => "gossip_voluntary_exit",
|
||||
Work::GossipProposerSlashing { .. } => "gossip_proposer_slashing",
|
||||
Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing",
|
||||
Work::RpcBlock { .. } => "rpc_block",
|
||||
Work::ChainSegment { .. } => "chain_segment",
|
||||
}
|
||||
@@ -351,21 +429,33 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver<WorkEvent<T::EthSpec>>) {
|
||||
let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
|
||||
|
||||
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
||||
// attestations into blocks. Additionally, later attestations contain more information than
|
||||
// earlier ones, so we consider them more valuable.
|
||||
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
|
||||
let mut aggregate_debounce = TimeLatch::default();
|
||||
|
||||
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
|
||||
let mut attestation_debounce = TimeLatch::default();
|
||||
|
||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
||||
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
|
||||
// a strong feeling about queue type for exits.
|
||||
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
|
||||
|
||||
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
|
||||
// queues with lots of junk messages.
|
||||
let mut gossip_proposer_slashing_queue =
|
||||
FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN);
|
||||
let mut gossip_attester_slashing_queue =
|
||||
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
|
||||
|
||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
||||
|
||||
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
||||
|
||||
let executor = self.executor.clone();
|
||||
|
||||
// The manager future will run on the non-blocking executor and delegate tasks to worker
|
||||
// The manager future will run on the core executor and delegate tasks to worker
|
||||
// threads on the blocking executor.
|
||||
let manager_future = async move {
|
||||
loop {
|
||||
@@ -452,6 +542,18 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
self.spawn_worker(idle_tx.clone(), item);
|
||||
} else if let Some(item) = attestation_queue.pop() {
|
||||
self.spawn_worker(idle_tx.clone(), item);
|
||||
// Check slashings after all other consensus messages so we prioritize
|
||||
// following head.
|
||||
//
|
||||
// Check attester slashings before proposer slashings since they have the
|
||||
// potential to slash multiple validators at once.
|
||||
} else if let Some(item) = gossip_attester_slashing_queue.pop() {
|
||||
self.spawn_worker(idle_tx.clone(), item);
|
||||
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
|
||||
self.spawn_worker(idle_tx.clone(), item);
|
||||
// Check exits last since our validators don't get rewards from them.
|
||||
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
|
||||
self.spawn_worker(idle_tx.clone(), item);
|
||||
}
|
||||
}
|
||||
// There is no new work event and we are unable to spawn a new worker.
|
||||
@@ -491,6 +593,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::GossipBlock { .. } => {
|
||||
gossip_block_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipVoluntaryExit { .. } => {
|
||||
gossip_voluntary_exit_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipProposerSlashing { .. } => {
|
||||
gossip_proposer_slashing_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipAttesterSlashing { .. } => {
|
||||
gossip_attester_slashing_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
|
||||
Work::ChainSegment { .. } => {
|
||||
chain_segment_queue.push(work, work_id, &self.log)
|
||||
@@ -523,6 +634,18 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
|
||||
chain_segment_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL,
|
||||
gossip_voluntary_exit_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL,
|
||||
gossip_proposer_slashing_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL,
|
||||
gossip_attester_slashing_queue.len() as i64,
|
||||
);
|
||||
|
||||
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
|
||||
error!(
|
||||
@@ -544,7 +667,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn on the non-blocking executor.
|
||||
// Spawn on the core executor.
|
||||
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
||||
}
|
||||
|
||||
@@ -574,11 +697,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
return;
|
||||
};
|
||||
|
||||
let network_tx = self.network_tx.clone();
|
||||
let sync_tx = self.sync_tx.clone();
|
||||
let log = self.log.clone();
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let worker = Worker {
|
||||
chain,
|
||||
network_tx: self.network_tx.clone(),
|
||||
sync_tx: self.sync_tx.clone(),
|
||||
log: self.log.clone(),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Spawning beacon processor worker";
|
||||
@@ -589,298 +717,85 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
executor.spawn_blocking(
|
||||
move || {
|
||||
let _worker_timer = worker_timer;
|
||||
let inner_log = log.clone();
|
||||
|
||||
// We use this closure pattern to avoid using a `return` that prevents the
|
||||
// `idle_tx` message from sending.
|
||||
let handler = || {
|
||||
let log = inner_log.clone();
|
||||
match work {
|
||||
/*
|
||||
* Unaggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAttestation {
|
||||
message_id,
|
||||
peer_id,
|
||||
attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
} => {
|
||||
let beacon_block_root = attestation.data.beacon_block_root;
|
||||
|
||||
let attestation = match chain
|
||||
.verify_unaggregated_attestation_for_gossip(*attestation, subnet_id)
|
||||
{
|
||||
Ok(attestation) => attestation,
|
||||
Err(e) => {
|
||||
handle_attestation_verification_failure(
|
||||
&log,
|
||||
sync_tx,
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"unaggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
||||
|
||||
if !should_import {
|
||||
return;
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(
|
||||
ForkChoiceError::InvalidAttestation(e),
|
||||
) => debug!(
|
||||
log,
|
||||
"Attestation invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
e => error!(
|
||||
log,
|
||||
"Error applying attestation to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) {
|
||||
debug!(
|
||||
log,
|
||||
"Attestation invalid for agg pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
||||
);
|
||||
}
|
||||
/*
|
||||
* Aggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAggregate {
|
||||
message_id,
|
||||
peer_id,
|
||||
aggregate,
|
||||
} => {
|
||||
let beacon_block_root =
|
||||
aggregate.message.aggregate.data.beacon_block_root;
|
||||
|
||||
let aggregate =
|
||||
match chain.verify_aggregated_attestation_for_gossip(*aggregate) {
|
||||
Ok(aggregate) => aggregate,
|
||||
Err(e) => {
|
||||
handle_attestation_verification_failure(
|
||||
&log,
|
||||
sync_tx,
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"aggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(
|
||||
ForkChoiceError::InvalidAttestation(e),
|
||||
) => debug!(
|
||||
log,
|
||||
"Aggregate invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
e => error!(
|
||||
log,
|
||||
"Error applying aggregate to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) {
|
||||
debug!(
|
||||
log,
|
||||
"Attestation invalid for op pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
||||
);
|
||||
}
|
||||
/*
|
||||
* Verification for beacon blocks received on gossip.
|
||||
*/
|
||||
Work::GossipBlock {
|
||||
message_id,
|
||||
peer_id,
|
||||
block,
|
||||
} => {
|
||||
let verified_block = match chain.verify_block_for_gossip(*block) {
|
||||
Ok(verified_block) => {
|
||||
info!(
|
||||
log,
|
||||
"New block received";
|
||||
"slot" => verified_block.block.slot(),
|
||||
"hash" => verified_block.block_root.to_string()
|
||||
);
|
||||
propagate_gossip_message(
|
||||
network_tx,
|
||||
message_id,
|
||||
peer_id.clone(),
|
||||
&log,
|
||||
);
|
||||
verified_block
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
send_sync_message(
|
||||
sync_tx,
|
||||
SyncMessage::UnknownBlock(peer_id, block),
|
||||
&log,
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||
debug!(
|
||||
log,
|
||||
"Gossip block is already known";
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
log,
|
||||
"Could not verify block for gossip";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
let block = Box::new(verified_block.block.clone());
|
||||
match chain.process_block(verified_block) {
|
||||
Ok(_block_root) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
|
||||
);
|
||||
|
||||
trace!(
|
||||
log,
|
||||
"Gossipsub block processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
|
||||
// TODO: It would be better if we can run this _after_ we publish the block to
|
||||
// reduce block propagation latency.
|
||||
//
|
||||
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
||||
// to have a reference to the `BeaconChain`. I will leave this for future
|
||||
// works.
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"location" => "block gossip"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "block gossip"
|
||||
),
|
||||
}
|
||||
}
|
||||
Err(BlockError::ParentUnknown { .. }) => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
// This should not occur. It should be checked by `should_forward_block`
|
||||
error!(
|
||||
log,
|
||||
"Block with unknown parent attempted to be processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
send_sync_message(
|
||||
sync_tx,
|
||||
SyncMessage::UnknownBlock(peer_id, block),
|
||||
&log,
|
||||
);
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
log,
|
||||
"Invalid gossip beacon block";
|
||||
"outcome" => format!("{:?}", other),
|
||||
"block root" => format!("{}", block.canonical_root()),
|
||||
"block slot" => block.slot()
|
||||
);
|
||||
trace!(
|
||||
log,
|
||||
"Invalid gossip beacon block ssz";
|
||||
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
/*
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock { block, result_tx } => {
|
||||
let block_result = chain.process_block(*block);
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL,
|
||||
);
|
||||
|
||||
if result_tx.send(block_result).is_err() {
|
||||
crit!(log, "Failed return sync block result");
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Verification for a chain segment (multiple blocks).
|
||||
*/
|
||||
Work::ChainSegment { process_id, blocks } => {
|
||||
handle_chain_segment(chain, process_id, blocks, sync_tx, log)
|
||||
}
|
||||
};
|
||||
match work {
|
||||
/*
|
||||
* Unaggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAttestation {
|
||||
message_id,
|
||||
peer_id,
|
||||
attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
} => worker.process_gossip_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
*attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
),
|
||||
/*
|
||||
* Aggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAggregate {
|
||||
message_id,
|
||||
peer_id,
|
||||
aggregate,
|
||||
} => worker.process_gossip_aggregate(message_id, peer_id, *aggregate),
|
||||
/*
|
||||
* Verification for beacon blocks received on gossip.
|
||||
*/
|
||||
Work::GossipBlock {
|
||||
message_id,
|
||||
peer_id,
|
||||
block,
|
||||
} => worker.process_gossip_block(message_id, peer_id, *block),
|
||||
/*
|
||||
* Voluntary exits received on gossip.
|
||||
*/
|
||||
Work::GossipVoluntaryExit {
|
||||
message_id,
|
||||
peer_id,
|
||||
voluntary_exit,
|
||||
} => worker.process_gossip_voluntary_exit(message_id, peer_id, *voluntary_exit),
|
||||
/*
|
||||
* Proposer slashings received on gossip.
|
||||
*/
|
||||
Work::GossipProposerSlashing {
|
||||
message_id,
|
||||
peer_id,
|
||||
proposer_slashing,
|
||||
} => worker.process_gossip_proposer_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
*proposer_slashing,
|
||||
),
|
||||
/*
|
||||
* Attester slashings received on gossip.
|
||||
*/
|
||||
Work::GossipAttesterSlashing {
|
||||
message_id,
|
||||
peer_id,
|
||||
attester_slashing,
|
||||
} => worker.process_gossip_attester_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
*attester_slashing,
|
||||
),
|
||||
/*
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock { block, result_tx } => {
|
||||
worker.process_rpc_block(*block, result_tx)
|
||||
}
|
||||
/*
|
||||
* Verification for a chain segment (multiple blocks).
|
||||
*/
|
||||
Work::ChainSegment { process_id, blocks } => {
|
||||
worker.process_chain_segment(process_id, blocks)
|
||||
}
|
||||
};
|
||||
handler();
|
||||
|
||||
trace!(
|
||||
log,
|
||||
@@ -902,300 +817,3 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
||||
/// the gossip network.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn propagate_gossip_message<E: EthSpec>(
|
||||
network_tx: mpsc::UnboundedSender<NetworkMessage<E>>,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
log: &Logger,
|
||||
) {
|
||||
network_tx
|
||||
.send(NetworkMessage::Validate {
|
||||
propagation_source: peer_id,
|
||||
message_id,
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
log,
|
||||
"Could not send propagation request to the network service"
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// Send a message to `sync_tx`.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn send_sync_message<E: EthSpec>(
|
||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
||||
message: SyncMessage<E>,
|
||||
log: &Logger,
|
||||
) {
|
||||
sync_tx
|
||||
.send(message)
|
||||
.unwrap_or_else(|_| error!(log, "Could not send message to the sync service"));
|
||||
}
|
||||
|
||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||
/// network.
|
||||
pub fn handle_attestation_verification_failure<E: EthSpec>(
|
||||
log: &Logger,
|
||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
||||
peer_id: PeerId,
|
||||
beacon_block_root: Hash256,
|
||||
attestation_type: &str,
|
||||
error: AttnError,
|
||||
) {
|
||||
metrics::register_attestation_error(&error);
|
||||
match &error {
|
||||
AttnError::FutureEpoch { .. }
|
||||
| AttnError::PastEpoch { .. }
|
||||
| AttnError::FutureSlot { .. }
|
||||
| AttnError::PastSlot { .. } => {
|
||||
/*
|
||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||
*
|
||||
*
|
||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||
*/
|
||||
}
|
||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||
/*
|
||||
* These errors are caused by invalid signatures.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::EmptyAggregationBitfield => {
|
||||
/*
|
||||
* The aggregate had no signatures and is therefore worthless.
|
||||
*
|
||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||
* violation of the spec nor indication of fault.
|
||||
*
|
||||
* This may change soon. Reference:
|
||||
*
|
||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorNotInCommittee { .. } => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Attestation already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* There has already been an aggregate attestation seen from this
|
||||
* aggregator index.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Aggregator already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::PriorAttestationKnown { .. } => {
|
||||
/*
|
||||
* We have already seen an attestation from this validator for this epoch.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Prior attestation known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::ValidatorIndexTooHigh(_) => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||
// just old. See:
|
||||
//
|
||||
// https://github.com/sigp/lighthouse/issues/1039
|
||||
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root)
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
sync_tx
|
||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
log,
|
||||
"Failed to send to sync service";
|
||||
"msg" => "UnknownBlockHash"
|
||||
)
|
||||
});
|
||||
return;
|
||||
}
|
||||
AttnError::UnknownTargetRoot(_) => {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
* 2. We do not have the target root in our DB.
|
||||
*
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BadTargetEpoch => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||
/*
|
||||
* It is not possible to attest this the given committee in the given slot.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||
/*
|
||||
* The unaggregated attestation doesn't have only one signature.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestsToFutureBlock { .. } => {
|
||||
/*
|
||||
* The beacon_block_root is from a higher slot than the attestation.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
|
||||
AttnError::InvalidSubnetId { received, expected } => {
|
||||
/*
|
||||
* The attestation was received on an incorrect subnet id.
|
||||
*/
|
||||
debug!(
|
||||
log,
|
||||
"Received attestation on incorrect subnet";
|
||||
"expected" => format!("{:?}", expected),
|
||||
"received" => format!("{:?}", received),
|
||||
)
|
||||
}
|
||||
AttnError::Invalid(_) => {
|
||||
/*
|
||||
* The attestation failed the state_processing verification.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::TooManySkippedSlots {
|
||||
head_block_slot,
|
||||
attestation_slot,
|
||||
} => {
|
||||
/*
|
||||
* The attestation references a head block that is too far behind the attestation slot.
|
||||
*
|
||||
* The message is not necessarily invalid, but we choose to ignore it.
|
||||
*/
|
||||
debug!(
|
||||
log,
|
||||
"Rejected long skip slot attestation";
|
||||
"head_block_slot" => head_block_slot,
|
||||
"attestation_slot" => attestation_slot,
|
||||
)
|
||||
}
|
||||
AttnError::BeaconChainError(e) => {
|
||||
/*
|
||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||
* so we have a bug.
|
||||
*
|
||||
* It's not clear if the message is invalid/malicious.
|
||||
*/
|
||||
error!(
|
||||
log,
|
||||
"Unable to validate aggregate";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
log,
|
||||
"Invalid attestation from network";
|
||||
"reason" => format!("{:?}", error),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
}
|
||||
|
||||
726
beacon_node/network/src/beacon_processor/worker.rs
Normal file
726
beacon_node/network/src/beacon_processor/worker.rs
Normal file
@@ -0,0 +1,726 @@
|
||||
use super::{
|
||||
chain_segment::{handle_chain_segment, ProcessId},
|
||||
BlockResultSender,
|
||||
};
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
use beacon_chain::{
|
||||
attestation_verification::Error as AttnError, observed_operations::ObservationOutcome,
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
|
||||
};
|
||||
use eth2_libp2p::{MessageId, PeerId};
|
||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use ssz::Encode;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
|
||||
};
|
||||
|
||||
/// Contains the context necessary to import blocks, attestations, etc to the beacon chain.
|
||||
pub struct Worker<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Worker<T> {
|
||||
/// Process the unaggregated attestation received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to apply it to fork choice.
|
||||
/// - Attempt to add it to the naive aggregation pool.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_attestation(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attestation: Attestation<T::EthSpec>,
|
||||
subnet_id: SubnetId,
|
||||
should_import: bool,
|
||||
) {
|
||||
let beacon_block_root = attestation.data.beacon_block_root;
|
||||
|
||||
let attestation = match self
|
||||
.chain
|
||||
.verify_unaggregated_attestation_for_gossip(attestation, subnet_id)
|
||||
{
|
||||
Ok(attestation) => attestation,
|
||||
Err(e) => {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"unaggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
|
||||
if !should_import {
|
||||
return;
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||
|
||||
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
e => error!(
|
||||
self.log,
|
||||
"Error applying attestation to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for agg pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
/// Process the aggregated attestation received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to apply it to fork choice.
|
||||
/// - Attempt to add it to the block inclusion pool.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_aggregate(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
aggregate: SignedAggregateAndProof<T::EthSpec>,
|
||||
) {
|
||||
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
|
||||
|
||||
let aggregate = match self
|
||||
.chain
|
||||
.verify_aggregated_attestation_for_gossip(aggregate)
|
||||
{
|
||||
Ok(aggregate) => aggregate,
|
||||
Err(e) => {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"aggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||
|
||||
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Aggregate invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
e => error!(
|
||||
self.log,
|
||||
"Error applying aggregate to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for op pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
/// Process the beacon block received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to
|
||||
/// be downloaded.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_block(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
block: SignedBeaconBlock<T::EthSpec>,
|
||||
) {
|
||||
let verified_block = match self.chain.verify_block_for_gossip(block) {
|
||||
Ok(verified_block) => {
|
||||
info!(
|
||||
self.log,
|
||||
"New block received";
|
||||
"slot" => verified_block.block.slot(),
|
||||
"hash" => verified_block.block_root.to_string()
|
||||
);
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
verified_block
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
return;
|
||||
}
|
||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Gossip block is already known";
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not verify block for gossip";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
|
||||
|
||||
let block = Box::new(verified_block.block.clone());
|
||||
match self.chain.process_block(verified_block) {
|
||||
Ok(_block_root) => {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Gossipsub block processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
|
||||
// TODO: It would be better if we can run this _after_ we publish the block to
|
||||
// reduce block propagation latency.
|
||||
//
|
||||
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
||||
// to have a reference to the `BeaconChain`. I will leave this for future
|
||||
// works.
|
||||
match self.chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
self.log,
|
||||
"Fork choice success";
|
||||
"location" => "block gossip"
|
||||
),
|
||||
Err(e) => error!(
|
||||
self.log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "block gossip"
|
||||
),
|
||||
}
|
||||
}
|
||||
Err(BlockError::ParentUnknown { .. }) => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
// This should not occur. It should be checked by `should_forward_block`
|
||||
error!(
|
||||
self.log,
|
||||
"Block with unknown parent attempted to be processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Invalid gossip beacon block";
|
||||
"outcome" => format!("{:?}", other),
|
||||
"block root" => format!("{}", block.canonical_root()),
|
||||
"block slot" => block.slot()
|
||||
);
|
||||
trace!(
|
||||
self.log,
|
||||
"Invalid gossip beacon block ssz";
|
||||
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn process_gossip_voluntary_exit(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
voluntary_exit: SignedVoluntaryExit,
|
||||
) {
|
||||
let validator_index = voluntary_exit.message.validator_index;
|
||||
|
||||
let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
|
||||
Ok(ObservationOutcome::New(exit)) => exit,
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping exit for already exiting validator";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid exit";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL);
|
||||
|
||||
self.propagate_gossip_message(message_id, peer_id);
|
||||
|
||||
self.chain.import_voluntary_exit(exit);
|
||||
debug!(self.log, "Successfully imported voluntary exit");
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
pub fn process_gossip_proposer_slashing(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
proposer_slashing: ProposerSlashing,
|
||||
) {
|
||||
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;
|
||||
|
||||
let slashing = match self
|
||||
.chain
|
||||
.verify_proposer_slashing_for_gossip(proposer_slashing)
|
||||
{
|
||||
Ok(ObservationOutcome::New(slashing)) => slashing,
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping proposer slashing";
|
||||
"reason" => "Already seen a proposer slashing for that validator",
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid proposer slashing";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
|
||||
|
||||
self.propagate_gossip_message(message_id, peer_id);
|
||||
|
||||
self.chain.import_proposer_slashing(slashing);
|
||||
debug!(self.log, "Successfully imported proposer slashing");
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
pub fn process_gossip_attester_slashing(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attester_slashing: AttesterSlashing<T::EthSpec>,
|
||||
) {
|
||||
let slashing = match self
|
||||
.chain
|
||||
.verify_attester_slashing_for_gossip(attester_slashing)
|
||||
{
|
||||
Ok(ObservationOutcome::New(slashing)) => slashing,
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping attester slashing";
|
||||
"reason" => "Slashings already known for all slashed validators",
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid attester slashing";
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
|
||||
|
||||
self.propagate_gossip_message(message_id, peer_id);
|
||||
|
||||
if let Err(e) = self.chain.import_attester_slashing(slashing) {
|
||||
debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e));
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL);
|
||||
} else {
|
||||
debug!(self.log, "Successfully imported attester slashing");
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to process a block received from a direct RPC request, returning the processing
|
||||
/// result on the `result_tx` channel.
|
||||
///
|
||||
/// Raises a log if there are errors publishing the result to the channel.
|
||||
pub fn process_rpc_block(
|
||||
self,
|
||||
block: SignedBeaconBlock<T::EthSpec>,
|
||||
result_tx: BlockResultSender<T::EthSpec>,
|
||||
) {
|
||||
let block_result = self.chain.process_block(block);
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
if result_tx.send(block_result).is_err() {
|
||||
crit!(self.log, "Failed return sync block result");
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
|
||||
/// thread if more blocks are needed to process it.
|
||||
pub fn process_chain_segment(
|
||||
self,
|
||||
process_id: ProcessId,
|
||||
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log)
|
||||
}
|
||||
|
||||
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
||||
/// the gossip network.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) {
|
||||
self.network_tx
|
||||
.send(NetworkMessage::Validate {
|
||||
propagation_source: peer_id,
|
||||
message_id,
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not send propagation request to the network service"
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// Send a message to `sync_tx`.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
|
||||
self.sync_tx
|
||||
.send(message)
|
||||
.unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service"));
|
||||
}
|
||||
|
||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||
/// network.
|
||||
pub fn handle_attestation_verification_failure(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
beacon_block_root: Hash256,
|
||||
attestation_type: &str,
|
||||
error: AttnError,
|
||||
) {
|
||||
metrics::register_attestation_error(&error);
|
||||
match &error {
|
||||
AttnError::FutureEpoch { .. }
|
||||
| AttnError::PastEpoch { .. }
|
||||
| AttnError::FutureSlot { .. }
|
||||
| AttnError::PastSlot { .. } => {
|
||||
/*
|
||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||
*
|
||||
*
|
||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||
*/
|
||||
}
|
||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||
/*
|
||||
* These errors are caused by invalid signatures.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::EmptyAggregationBitfield => {
|
||||
/*
|
||||
* The aggregate had no signatures and is therefore worthless.
|
||||
*
|
||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||
* violation of the spec nor indication of fault.
|
||||
*
|
||||
* This may change soon. Reference:
|
||||
*
|
||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorNotInCommittee { .. } => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Attestation already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* There has already been an aggregate attestation seen from this
|
||||
* aggregator index.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Aggregator already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::PriorAttestationKnown { .. } => {
|
||||
/*
|
||||
* We have already seen an attestation from this validator for this epoch.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Prior attestation known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::ValidatorIndexTooHigh(_) => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||
// just old. See:
|
||||
//
|
||||
// https://github.com/sigp/lighthouse/issues/1039
|
||||
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root)
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
self.sync_tx
|
||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Failed to send to sync service";
|
||||
"msg" => "UnknownBlockHash"
|
||||
)
|
||||
});
|
||||
return;
|
||||
}
|
||||
AttnError::UnknownTargetRoot(_) => {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
* 2. We do not have the target root in our DB.
|
||||
*
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BadTargetEpoch => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||
/*
|
||||
* It is not possible to attest this the given committee in the given slot.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||
/*
|
||||
* The unaggregated attestation doesn't have only one signature.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestsToFutureBlock { .. } => {
|
||||
/*
|
||||
* The beacon_block_root is from a higher slot than the attestation.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
|
||||
AttnError::InvalidSubnetId { received, expected } => {
|
||||
/*
|
||||
* The attestation was received on an incorrect subnet id.
|
||||
*/
|
||||
debug!(
|
||||
self.log,
|
||||
"Received attestation on incorrect subnet";
|
||||
"expected" => format!("{:?}", expected),
|
||||
"received" => format!("{:?}", received),
|
||||
)
|
||||
}
|
||||
AttnError::Invalid(_) => {
|
||||
/*
|
||||
* The attestation failed the state_processing verification.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::TooManySkippedSlots {
|
||||
head_block_slot,
|
||||
attestation_slot,
|
||||
} => {
|
||||
/*
|
||||
* The attestation references a head block that is too far behind the attestation slot.
|
||||
*
|
||||
* The message is not necessarily invalid, but we choose to ignore it.
|
||||
*/
|
||||
debug!(
|
||||
self.log,
|
||||
"Rejected long skip slot attestation";
|
||||
"head_block_slot" => head_block_slot,
|
||||
"attestation_slot" => attestation_slot,
|
||||
)
|
||||
}
|
||||
AttnError::BeaconChainError(e) => {
|
||||
/*
|
||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||
* so we have a bug.
|
||||
*
|
||||
* It's not clear if the message is invalid/malicious.
|
||||
*/
|
||||
error!(
|
||||
self.log,
|
||||
"Unable to validate aggregate";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Invalid attestation from network";
|
||||
"reason" => format!("{:?}", error),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -98,6 +98,49 @@ lazy_static! {
|
||||
"beacon_processor_gossip_block_imported_total",
|
||||
"Total number of gossip blocks imported to fork choice, etc."
|
||||
);
|
||||
// Gossip Exits.
|
||||
pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
|
||||
"beacon_processor_exit_queue_total",
|
||||
"Count of exits from gossip waiting to be verified."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_exit_verified_total",
|
||||
"Total number of voluntary exits verified for propagation."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_exit_imported_total",
|
||||
"Total number of voluntary exits imported to the op pool."
|
||||
);
|
||||
// Gossip proposer slashings.
|
||||
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
|
||||
"beacon_processor_proposer_slashing_queue_total",
|
||||
"Count of proposer slashings from gossip waiting to be verified."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_proposer_slashing_verified_total",
|
||||
"Total number of proposer slashings verified for propagation."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_proposer_slashing_imported_total",
|
||||
"Total number of proposer slashings imported to the op pool."
|
||||
);
|
||||
// Gossip attester slashings.
|
||||
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
|
||||
"beacon_processor_attester_slashing_queue_total",
|
||||
"Count of attester slashings from gossip waiting to be verified."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_attester_slashing_verified_total",
|
||||
"Total number of attester slashings verified for propagation."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_attester_slashing_imported_total",
|
||||
"Total number of attester slashings imported to the op pool."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_attester_slashing_error_total",
|
||||
"Total number of attester slashings that raised an error during processing."
|
||||
);
|
||||
// Rpc blocks.
|
||||
pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
|
||||
"beacon_processor_rpc_block_queue_total",
|
||||
|
||||
@@ -16,7 +16,7 @@ use eth2_libp2p::{
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use processor::Processor;
|
||||
use slog::{debug, o, trace, warn};
|
||||
use slog::{debug, o, trace};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::EthSpec;
|
||||
@@ -26,8 +26,6 @@ use types::EthSpec;
|
||||
/// passing them to the internal message processor. The message processor spawns a syncing thread
|
||||
/// which manages which blocks need to be requested and processed.
|
||||
pub struct Router<T: BeaconChainTypes> {
|
||||
/// A channel to the network service to allow for gossip propagation.
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
/// Access to the peer db for logging.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
/// Processes validated and decoded messages from the network. Has direct access to the
|
||||
@@ -89,13 +87,12 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
executor.clone(),
|
||||
beacon_chain,
|
||||
network_globals.clone(),
|
||||
network_send.clone(),
|
||||
network_send,
|
||||
&log,
|
||||
);
|
||||
|
||||
// generate the Message handler
|
||||
let mut handler = Router {
|
||||
network_send,
|
||||
network_globals,
|
||||
processor,
|
||||
log: message_handler_log,
|
||||
@@ -232,13 +229,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
}
|
||||
PubsubMessage::VoluntaryExit(exit) => {
|
||||
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id));
|
||||
if let Some(verified_exit) = self
|
||||
.processor
|
||||
.verify_voluntary_exit_for_gossip(&peer_id, *exit)
|
||||
{
|
||||
self.propagate_message(id, peer_id);
|
||||
self.processor.import_verified_voluntary_exit(verified_exit);
|
||||
}
|
||||
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);
|
||||
}
|
||||
PubsubMessage::ProposerSlashing(proposer_slashing) => {
|
||||
debug!(
|
||||
@@ -246,14 +237,8 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received a proposer slashing";
|
||||
"peer_id" => format!("{}", peer_id)
|
||||
);
|
||||
if let Some(verified_proposer_slashing) = self
|
||||
.processor
|
||||
.verify_proposer_slashing_for_gossip(&peer_id, *proposer_slashing)
|
||||
{
|
||||
self.propagate_message(id, peer_id);
|
||||
self.processor
|
||||
.import_verified_proposer_slashing(verified_proposer_slashing);
|
||||
}
|
||||
self.processor
|
||||
.on_proposer_slashing_gossip(id, peer_id, proposer_slashing);
|
||||
}
|
||||
PubsubMessage::AttesterSlashing(attester_slashing) => {
|
||||
debug!(
|
||||
@@ -261,30 +246,9 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
"Received a attester slashing";
|
||||
"peer_id" => format!("{}", peer_id)
|
||||
);
|
||||
if let Some(verified_attester_slashing) = self
|
||||
.processor
|
||||
.verify_attester_slashing_for_gossip(&peer_id, *attester_slashing)
|
||||
{
|
||||
self.propagate_message(id, peer_id);
|
||||
self.processor
|
||||
.import_verified_attester_slashing(verified_attester_slashing);
|
||||
}
|
||||
self.processor
|
||||
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Informs the network service that the message should be forwarded to other peers (is valid).
|
||||
fn propagate_message(&mut self, message_id: MessageId, propagation_source: PeerId) {
|
||||
self.network_send
|
||||
.send(NetworkMessage::Validate {
|
||||
propagation_source,
|
||||
message_id,
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not send propagation request to the network service"
|
||||
)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,13 @@ use crate::beacon_processor::{
|
||||
};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::sync::{PeerSyncInfo, SyncMessage};
|
||||
use beacon_chain::{observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::rpc::*;
|
||||
use eth2_libp2p::{
|
||||
MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response,
|
||||
};
|
||||
use itertools::process_results;
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use state_processing::SigVerifiedOp;
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -585,140 +584,70 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Verify a voluntary exit before gossiping or processing it.
|
||||
///
|
||||
/// Errors are logged at debug level.
|
||||
pub fn verify_voluntary_exit_for_gossip(
|
||||
&self,
|
||||
peer_id: &PeerId,
|
||||
voluntary_exit: SignedVoluntaryExit,
|
||||
) -> Option<SigVerifiedOp<SignedVoluntaryExit>> {
|
||||
let validator_index = voluntary_exit.message.validator_index;
|
||||
|
||||
match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
|
||||
Ok(ObservationOutcome::New(sig_verified_exit)) => Some(sig_verified_exit),
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping exit for already exiting validator";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid exit";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a verified exit into the op pool.
|
||||
pub fn import_verified_voluntary_exit(
|
||||
&self,
|
||||
verified_voluntary_exit: SigVerifiedOp<SignedVoluntaryExit>,
|
||||
pub fn on_voluntary_exit_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
voluntary_exit: Box<SignedVoluntaryExit>,
|
||||
) {
|
||||
self.chain.import_voluntary_exit(verified_voluntary_exit);
|
||||
debug!(self.log, "Successfully imported voluntary exit");
|
||||
self.beacon_processor_send
|
||||
.try_send(BeaconWorkEvent::gossip_voluntary_exit(
|
||||
message_id,
|
||||
peer_id,
|
||||
voluntary_exit,
|
||||
))
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
&self.log,
|
||||
"Unable to send to gossip processor";
|
||||
"type" => "voluntary exit gossip",
|
||||
"error" => e.to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Verify a proposer slashing before gossiping or processing it.
|
||||
///
|
||||
/// Errors are logged at debug level.
|
||||
pub fn verify_proposer_slashing_for_gossip(
|
||||
&self,
|
||||
peer_id: &PeerId,
|
||||
proposer_slashing: ProposerSlashing,
|
||||
) -> Option<SigVerifiedOp<ProposerSlashing>> {
|
||||
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;
|
||||
|
||||
match self
|
||||
.chain
|
||||
.verify_proposer_slashing_for_gossip(proposer_slashing)
|
||||
{
|
||||
Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing),
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping proposer slashing";
|
||||
"reason" => "Already seen a proposer slashing for that validator",
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid proposer slashing";
|
||||
"validator_index" => validator_index,
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a verified proposer slashing into the op pool.
|
||||
pub fn import_verified_proposer_slashing(
|
||||
&self,
|
||||
proposer_slashing: SigVerifiedOp<ProposerSlashing>,
|
||||
pub fn on_proposer_slashing_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
proposer_slashing: Box<ProposerSlashing>,
|
||||
) {
|
||||
self.chain.import_proposer_slashing(proposer_slashing);
|
||||
debug!(self.log, "Successfully imported proposer slashing");
|
||||
self.beacon_processor_send
|
||||
.try_send(BeaconWorkEvent::gossip_proposer_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
proposer_slashing,
|
||||
))
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
&self.log,
|
||||
"Unable to send to gossip processor";
|
||||
"type" => "proposer slashing gossip",
|
||||
"error" => e.to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Verify an attester slashing before gossiping or processing it.
|
||||
///
|
||||
/// Errors are logged at debug level.
|
||||
pub fn verify_attester_slashing_for_gossip(
|
||||
&self,
|
||||
peer_id: &PeerId,
|
||||
attester_slashing: AttesterSlashing<T::EthSpec>,
|
||||
) -> Option<SigVerifiedOp<AttesterSlashing<T::EthSpec>>> {
|
||||
match self
|
||||
.chain
|
||||
.verify_attester_slashing_for_gossip(attester_slashing)
|
||||
{
|
||||
Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing),
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping attester slashing";
|
||||
"reason" => "Slashings already known for all slashed validators",
|
||||
"peer" => peer_id.to_string()
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping invalid attester slashing";
|
||||
"peer" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a verified attester slashing into the op pool.
|
||||
pub fn import_verified_attester_slashing(
|
||||
&self,
|
||||
attester_slashing: SigVerifiedOp<AttesterSlashing<T::EthSpec>>,
|
||||
pub fn on_attester_slashing_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
|
||||
) {
|
||||
if let Err(e) = self.chain.import_attester_slashing(attester_slashing) {
|
||||
debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e));
|
||||
} else {
|
||||
debug!(self.log, "Successfully imported attester slashing");
|
||||
}
|
||||
self.beacon_processor_send
|
||||
.try_send(BeaconWorkEvent::gossip_attester_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
attester_slashing,
|
||||
))
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
&self.log,
|
||||
"Unable to send to gossip processor";
|
||||
"type" => "attester slashing gossip",
|
||||
"error" => e.to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -169,6 +169,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
mut service: NetworkService<T>,
|
||||
) -> error::Result<()> {
|
||||
let mut exit_rx = executor.exit();
|
||||
let mut shutdown_sender = executor.shutdown_sender();
|
||||
|
||||
// spawn on the current executor
|
||||
executor.spawn_without_exit(async move {
|
||||
@@ -271,8 +272,8 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
AttServiceMessage::EnrRemove(subnet_id) => {
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
|
||||
}
|
||||
AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => {
|
||||
service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl);
|
||||
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
||||
service.libp2p.swarm.discover_subnet_peers(subnets_to_discover);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -376,6 +377,12 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
Libp2pEvent::NewListenAddr(multiaddr) => {
|
||||
service.network_globals.listen_multiaddrs.write().push(multiaddr);
|
||||
}
|
||||
Libp2pEvent::ZeroListeners => {
|
||||
let _ = shutdown_sender.send("All listeners are closed. Unable to listen").await.map_err(|e| {
|
||||
warn!(service.log, "failed to send a shutdown signal"; "error" => e.to_string()
|
||||
)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,13 @@ mod tests {
|
||||
let runtime = Runtime::new().unwrap();
|
||||
|
||||
let (signal, exit) = exit_future::signal();
|
||||
let executor = environment::TaskExecutor::new(runtime.handle().clone(), exit, log.clone());
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = environment::TaskExecutor::new(
|
||||
runtime.handle().clone(),
|
||||
exit,
|
||||
log.clone(),
|
||||
shutdown_tx,
|
||||
);
|
||||
|
||||
let mut config = NetworkConfig::default();
|
||||
config.libp2p_port = 21212;
|
||||
|
||||
@@ -110,7 +110,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
}
|
||||
|
||||
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) {
|
||||
debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action"=> action.to_string());
|
||||
debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action" => action.to_string());
|
||||
self.network_send
|
||||
.send(NetworkMessage::ReportPeer { peer_id, action })
|
||||
.unwrap_or_else(|_| {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "boot_node"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use target_info::Target;
|
||||
/// `Lighthouse/v0.2.0-1419501f2+`
|
||||
pub const VERSION: &str = git_version!(
|
||||
args = ["--always", "--dirty=+"],
|
||||
prefix = "Lighthouse/v0.2.5-",
|
||||
prefix = "Lighthouse/v0.2.6-",
|
||||
fallback = "unknown"
|
||||
);
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lcli"
|
||||
description = "Lighthouse CLI (modeled after zcli)"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Paul Hauner <paul@paulhauner.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lighthouse"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::metrics;
|
||||
use futures::channel::mpsc::Sender;
|
||||
use futures::prelude::*;
|
||||
use slog::{debug, trace};
|
||||
use tokio::runtime::Handle;
|
||||
@@ -10,6 +11,12 @@ pub struct TaskExecutor {
|
||||
pub(crate) handle: Handle,
|
||||
/// The receiver exit future which on receiving shuts down the task
|
||||
pub(crate) exit: exit_future::Exit,
|
||||
/// Sender given to tasks, so that if they encounter a state in which execution cannot
|
||||
/// continue they can request that everything shuts down.
|
||||
///
|
||||
/// The task must provide a reason for shutting down.
|
||||
pub(crate) signal_tx: Sender<&'static str>,
|
||||
|
||||
pub(crate) log: slog::Logger,
|
||||
}
|
||||
|
||||
@@ -18,8 +25,18 @@ impl TaskExecutor {
|
||||
///
|
||||
/// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from
|
||||
/// a [`RuntimeContext`](struct.RuntimeContext.html)
|
||||
pub fn new(handle: Handle, exit: exit_future::Exit, log: slog::Logger) -> Self {
|
||||
Self { handle, exit, log }
|
||||
pub fn new(
|
||||
handle: Handle,
|
||||
exit: exit_future::Exit,
|
||||
log: slog::Logger,
|
||||
signal_tx: Sender<&'static str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
handle,
|
||||
exit,
|
||||
signal_tx,
|
||||
log,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled
|
||||
@@ -51,7 +68,7 @@ impl TaskExecutor {
|
||||
|
||||
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
|
||||
/// like [spawn](#method.spawn).
|
||||
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
|
||||
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
|
||||
/// ensure that the task gets canceled appropriately.
|
||||
/// This function generates prometheus metrics on number of tasks and task duration.
|
||||
///
|
||||
@@ -121,6 +138,11 @@ impl TaskExecutor {
|
||||
self.exit.clone()
|
||||
}
|
||||
|
||||
/// Get a channel to request shutting down.
|
||||
pub fn shutdown_sender(&self) -> Sender<&'static str> {
|
||||
self.signal_tx.clone()
|
||||
}
|
||||
|
||||
/// Returns a reference to the logger.
|
||||
pub fn log(&self) -> &slog::Logger {
|
||||
&self.log
|
||||
|
||||
@@ -9,7 +9,11 @@
|
||||
|
||||
use eth2_config::Eth2Config;
|
||||
use eth2_testnet_config::Eth2TestnetConfig;
|
||||
use futures::channel::oneshot;
|
||||
use futures::channel::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
oneshot,
|
||||
};
|
||||
use futures::{future, StreamExt};
|
||||
|
||||
pub use executor::TaskExecutor;
|
||||
use slog::{info, o, Drain, Level, Logger};
|
||||
@@ -260,10 +264,13 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
|
||||
/// Consumes the builder, returning an `Environment`.
|
||||
pub fn build(self) -> Result<Environment<E>, String> {
|
||||
let (signal, exit) = exit_future::signal();
|
||||
let (signal_tx, signal_rx) = channel(1);
|
||||
Ok(Environment {
|
||||
runtime: self
|
||||
.runtime
|
||||
.ok_or_else(|| "Cannot build environment without runtime".to_string())?,
|
||||
signal_tx,
|
||||
signal_rx: Some(signal_rx),
|
||||
signal: Some(signal),
|
||||
exit,
|
||||
log: self
|
||||
@@ -295,6 +302,7 @@ impl<E: EthSpec> RuntimeContext<E> {
|
||||
Self {
|
||||
executor: TaskExecutor {
|
||||
handle: self.executor.handle.clone(),
|
||||
signal_tx: self.executor.signal_tx.clone(),
|
||||
exit: self.executor.exit.clone(),
|
||||
log: self.executor.log.new(o!("service" => service_name)),
|
||||
},
|
||||
@@ -318,6 +326,10 @@ impl<E: EthSpec> RuntimeContext<E> {
|
||||
/// validator client, or to run tests that involve logging and async task execution.
|
||||
pub struct Environment<E: EthSpec> {
|
||||
runtime: Runtime,
|
||||
/// Receiver side of an internal shutdown signal.
|
||||
signal_rx: Option<Receiver<&'static str>>,
|
||||
/// Sender to request shutting down.
|
||||
signal_tx: Sender<&'static str>,
|
||||
signal: Option<exit_future::Signal>,
|
||||
exit: exit_future::Exit,
|
||||
log: Logger,
|
||||
@@ -340,6 +352,7 @@ impl<E: EthSpec> Environment<E> {
|
||||
RuntimeContext {
|
||||
executor: TaskExecutor {
|
||||
exit: self.exit.clone(),
|
||||
signal_tx: self.signal_tx.clone(),
|
||||
handle: self.runtime().handle().clone(),
|
||||
log: self.log.clone(),
|
||||
},
|
||||
@@ -353,6 +366,7 @@ impl<E: EthSpec> Environment<E> {
|
||||
RuntimeContext {
|
||||
executor: TaskExecutor {
|
||||
exit: self.exit.clone(),
|
||||
signal_tx: self.signal_tx.clone(),
|
||||
handle: self.runtime().handle().clone(),
|
||||
log: self.log.new(o!("service" => service_name)),
|
||||
},
|
||||
@@ -361,8 +375,20 @@ impl<E: EthSpec> Environment<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Block the current thread until Ctrl+C is received.
|
||||
pub fn block_until_ctrl_c(&mut self) -> Result<(), String> {
|
||||
/// Block the current thread until a shutdown signal is received.
|
||||
///
|
||||
/// This can be either the user Ctrl-C'ing or a task requesting to shutdown.
|
||||
pub fn block_until_shutdown_requested(&mut self) -> Result<(), String> {
|
||||
// future of a task requesting to shutdown
|
||||
let mut rx = self
|
||||
.signal_rx
|
||||
.take()
|
||||
.ok_or("Inner shutdown already received")?;
|
||||
let inner_shutdown =
|
||||
async move { rx.next().await.ok_or("Internal shutdown channel exhausted") };
|
||||
futures::pin_mut!(inner_shutdown);
|
||||
|
||||
// setup for handling a Ctrl-C
|
||||
let (ctrlc_send, ctrlc_oneshot) = oneshot::channel();
|
||||
let ctrlc_send_c = RefCell::new(Some(ctrlc_send));
|
||||
ctrlc::set_handler(move || {
|
||||
@@ -372,10 +398,18 @@ impl<E: EthSpec> Environment<E> {
|
||||
})
|
||||
.map_err(|e| format!("Could not set ctrlc handler: {:?}", e))?;
|
||||
|
||||
// Block this thread until Crtl+C is pressed.
|
||||
self.runtime()
|
||||
.block_on(ctrlc_oneshot)
|
||||
.map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))
|
||||
// Block this thread until a shutdown signal is received.
|
||||
match self
|
||||
.runtime()
|
||||
.block_on(future::select(inner_shutdown, ctrlc_oneshot))
|
||||
{
|
||||
future::Either::Left((Ok(reason), _)) => {
|
||||
info!(self.log, "Internal shutdown received"; "reason" => reason);
|
||||
Ok(())
|
||||
}
|
||||
future::Either::Left((Err(e), _)) => Err(e.into()),
|
||||
future::Either::Right((x, _)) => x.map_err(|e| format!("Ctrlc oneshot failed: {}", e)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown the `tokio` runtime when all tasks are idle.
|
||||
|
||||
@@ -300,8 +300,8 @@ fn run<E: EthSpec>(
|
||||
return Err("No subcommand supplied.".into());
|
||||
}
|
||||
|
||||
// Block this thread until Crtl+C is pressed.
|
||||
environment.block_until_ctrl_c()?;
|
||||
// Block this thread until we get a ctrl-c or a task sends a shutdown signal.
|
||||
environment.block_until_shutdown_requested()?;
|
||||
info!(log, "Shutting down..");
|
||||
|
||||
environment.fire_signal();
|
||||
|
||||
@@ -16,5 +16,6 @@ exec lighthouse \
|
||||
--testnet-dir $TESTNET_DIR \
|
||||
--dummy-eth1 \
|
||||
--http \
|
||||
--port 9902 \
|
||||
--http-port 6052 \
|
||||
--boot-nodes $(cat $BEACON_DIR/beacon/network/enr.dat)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "validator_client"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>", "Luke Anderson <luke@lukeanderson.com.au>"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user