mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-22 14:24:44 +00:00
Refactor into the new Worker struct
This commit is contained in:
@@ -36,22 +36,19 @@
|
|||||||
//! task.
|
//! task.
|
||||||
|
|
||||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||||
use beacon_chain::{
|
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
|
||||||
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
|
||||||
BlockError, ForkChoiceError,
|
|
||||||
};
|
|
||||||
use chain_segment::handle_chain_segment;
|
|
||||||
use environment::TaskExecutor;
|
use environment::TaskExecutor;
|
||||||
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
|
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
|
||||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
use slog::{crit, debug, error, trace, warn, Logger};
|
||||||
use ssz::Encode;
|
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||||
|
use worker::Worker;
|
||||||
|
|
||||||
mod chain_segment;
|
mod chain_segment;
|
||||||
|
mod worker;
|
||||||
|
|
||||||
pub use chain_segment::ProcessId;
|
pub use chain_segment::ProcessId;
|
||||||
|
|
||||||
@@ -365,7 +362,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
|
|
||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
|
|
||||||
// The manager future will run on the non-blocking executor and delegate tasks to worker
|
// The manager future will run on the core executor and delegate tasks to worker
|
||||||
// threads on the blocking executor.
|
// threads on the blocking executor.
|
||||||
let manager_future = async move {
|
let manager_future = async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -544,7 +541,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Spawn on the non-blocking executor.
|
// Spawn on the core executor.
|
||||||
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -574,11 +571,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let network_tx = self.network_tx.clone();
|
|
||||||
let sync_tx = self.sync_tx.clone();
|
|
||||||
let log = self.log.clone();
|
let log = self.log.clone();
|
||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
|
|
||||||
|
let worker = Worker {
|
||||||
|
chain: chain.clone(),
|
||||||
|
network_tx: self.network_tx.clone(),
|
||||||
|
sync_tx: self.sync_tx.clone(),
|
||||||
|
log: self.log.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
self.log,
|
self.log,
|
||||||
"Spawning beacon processor worker";
|
"Spawning beacon processor worker";
|
||||||
@@ -589,298 +591,53 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
executor.spawn_blocking(
|
executor.spawn_blocking(
|
||||||
move || {
|
move || {
|
||||||
let _worker_timer = worker_timer;
|
let _worker_timer = worker_timer;
|
||||||
let inner_log = log.clone();
|
|
||||||
|
|
||||||
// We use this closure pattern to avoid using a `return` that prevents the
|
match work {
|
||||||
// `idle_tx` message from sending.
|
/*
|
||||||
let handler = || {
|
* Unaggregated attestation verification.
|
||||||
let log = inner_log.clone();
|
*/
|
||||||
match work {
|
Work::GossipAttestation {
|
||||||
/*
|
message_id,
|
||||||
* Unaggregated attestation verification.
|
peer_id,
|
||||||
*/
|
attestation,
|
||||||
Work::GossipAttestation {
|
subnet_id,
|
||||||
message_id,
|
should_import,
|
||||||
peer_id,
|
} => worker.process_gossip_attestation(
|
||||||
attestation,
|
message_id,
|
||||||
subnet_id,
|
peer_id,
|
||||||
should_import,
|
*attestation,
|
||||||
} => {
|
subnet_id,
|
||||||
let beacon_block_root = attestation.data.beacon_block_root;
|
should_import,
|
||||||
|
),
|
||||||
let attestation = match chain
|
/*
|
||||||
.verify_unaggregated_attestation_for_gossip(*attestation, subnet_id)
|
* Aggregated attestation verification.
|
||||||
{
|
*/
|
||||||
Ok(attestation) => attestation,
|
Work::GossipAggregate {
|
||||||
Err(e) => {
|
message_id,
|
||||||
handle_attestation_verification_failure(
|
peer_id,
|
||||||
&log,
|
aggregate,
|
||||||
sync_tx,
|
} => worker.process_gossip_aggregate(message_id, peer_id, *aggregate),
|
||||||
peer_id,
|
/*
|
||||||
beacon_block_root,
|
* Verification for beacon blocks received on gossip.
|
||||||
"unaggregated",
|
*/
|
||||||
e,
|
Work::GossipBlock {
|
||||||
);
|
message_id,
|
||||||
return;
|
peer_id,
|
||||||
}
|
block,
|
||||||
};
|
} => worker.process_gossip_block(message_id, peer_id, *block),
|
||||||
|
/*
|
||||||
// Indicate to the `Network` service that this message is valid and can be
|
* Verification for beacon blocks received during syncing via RPC.
|
||||||
// propagated on the gossip network.
|
*/
|
||||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
Work::RpcBlock { block, result_tx } => {
|
||||||
|
worker.process_rpc_block(*block, result_tx)
|
||||||
if !should_import {
|
}
|
||||||
return;
|
/*
|
||||||
}
|
* Verification for a chain segment (multiple blocks).
|
||||||
|
*/
|
||||||
metrics::inc_counter(
|
Work::ChainSegment { process_id, blocks } => {
|
||||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
worker.process_chain_segment(process_id, blocks)
|
||||||
);
|
}
|
||||||
|
|
||||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
|
|
||||||
match e {
|
|
||||||
BeaconChainError::ForkChoiceError(
|
|
||||||
ForkChoiceError::InvalidAttestation(e),
|
|
||||||
) => debug!(
|
|
||||||
log,
|
|
||||||
"Attestation invalid for fork choice";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
),
|
|
||||||
e => error!(
|
|
||||||
log,
|
|
||||||
"Error applying attestation to fork choice";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) {
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Attestation invalid for agg pool";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* Aggregated attestation verification.
|
|
||||||
*/
|
|
||||||
Work::GossipAggregate {
|
|
||||||
message_id,
|
|
||||||
peer_id,
|
|
||||||
aggregate,
|
|
||||||
} => {
|
|
||||||
let beacon_block_root =
|
|
||||||
aggregate.message.aggregate.data.beacon_block_root;
|
|
||||||
|
|
||||||
let aggregate =
|
|
||||||
match chain.verify_aggregated_attestation_for_gossip(*aggregate) {
|
|
||||||
Ok(aggregate) => aggregate,
|
|
||||||
Err(e) => {
|
|
||||||
handle_attestation_verification_failure(
|
|
||||||
&log,
|
|
||||||
sync_tx,
|
|
||||||
peer_id,
|
|
||||||
beacon_block_root,
|
|
||||||
"aggregated",
|
|
||||||
e,
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Indicate to the `Network` service that this message is valid and can be
|
|
||||||
// propagated on the gossip network.
|
|
||||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
|
||||||
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) {
|
|
||||||
match e {
|
|
||||||
BeaconChainError::ForkChoiceError(
|
|
||||||
ForkChoiceError::InvalidAttestation(e),
|
|
||||||
) => debug!(
|
|
||||||
log,
|
|
||||||
"Aggregate invalid for fork choice";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
),
|
|
||||||
e => error!(
|
|
||||||
log,
|
|
||||||
"Error applying aggregate to fork choice";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) {
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Attestation invalid for op pool";
|
|
||||||
"reason" => format!("{:?}", e),
|
|
||||||
"peer" => peer_id.to_string(),
|
|
||||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* Verification for beacon blocks received on gossip.
|
|
||||||
*/
|
|
||||||
Work::GossipBlock {
|
|
||||||
message_id,
|
|
||||||
peer_id,
|
|
||||||
block,
|
|
||||||
} => {
|
|
||||||
let verified_block = match chain.verify_block_for_gossip(*block) {
|
|
||||||
Ok(verified_block) => {
|
|
||||||
info!(
|
|
||||||
log,
|
|
||||||
"New block received";
|
|
||||||
"slot" => verified_block.block.slot(),
|
|
||||||
"hash" => verified_block.block_root.to_string()
|
|
||||||
);
|
|
||||||
propagate_gossip_message(
|
|
||||||
network_tx,
|
|
||||||
message_id,
|
|
||||||
peer_id.clone(),
|
|
||||||
&log,
|
|
||||||
);
|
|
||||||
verified_block
|
|
||||||
}
|
|
||||||
Err(BlockError::ParentUnknown(block)) => {
|
|
||||||
send_sync_message(
|
|
||||||
sync_tx,
|
|
||||||
SyncMessage::UnknownBlock(peer_id, block),
|
|
||||||
&log,
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Gossip block is already known";
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
log,
|
|
||||||
"Could not verify block for gossip";
|
|
||||||
"error" => format!("{:?}", e)
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL,
|
|
||||||
);
|
|
||||||
|
|
||||||
let block = Box::new(verified_block.block.clone());
|
|
||||||
match chain.process_block(verified_block) {
|
|
||||||
Ok(_block_root) => {
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
|
|
||||||
);
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
log,
|
|
||||||
"Gossipsub block processed";
|
|
||||||
"peer_id" => peer_id.to_string()
|
|
||||||
);
|
|
||||||
|
|
||||||
// TODO: It would be better if we can run this _after_ we publish the block to
|
|
||||||
// reduce block propagation latency.
|
|
||||||
//
|
|
||||||
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
|
||||||
// to have a reference to the `BeaconChain`. I will leave this for future
|
|
||||||
// works.
|
|
||||||
match chain.fork_choice() {
|
|
||||||
Ok(()) => trace!(
|
|
||||||
log,
|
|
||||||
"Fork choice success";
|
|
||||||
"location" => "block gossip"
|
|
||||||
),
|
|
||||||
Err(e) => error!(
|
|
||||||
log,
|
|
||||||
"Fork choice failed";
|
|
||||||
"error" => format!("{:?}", e),
|
|
||||||
"location" => "block gossip"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(BlockError::ParentUnknown { .. }) => {
|
|
||||||
// Inform the sync manager to find parents for this block
|
|
||||||
// This should not occur. It should be checked by `should_forward_block`
|
|
||||||
error!(
|
|
||||||
log,
|
|
||||||
"Block with unknown parent attempted to be processed";
|
|
||||||
"peer_id" => peer_id.to_string()
|
|
||||||
);
|
|
||||||
send_sync_message(
|
|
||||||
sync_tx,
|
|
||||||
SyncMessage::UnknownBlock(peer_id, block),
|
|
||||||
&log,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Invalid gossip beacon block";
|
|
||||||
"outcome" => format!("{:?}", other),
|
|
||||||
"block root" => format!("{}", block.canonical_root()),
|
|
||||||
"block slot" => block.slot()
|
|
||||||
);
|
|
||||||
trace!(
|
|
||||||
log,
|
|
||||||
"Invalid gossip beacon block ssz";
|
|
||||||
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* Verification for beacon blocks received during syncing via RPC.
|
|
||||||
*/
|
|
||||||
Work::RpcBlock { block, result_tx } => {
|
|
||||||
let block_result = chain.process_block(*block);
|
|
||||||
|
|
||||||
metrics::inc_counter(
|
|
||||||
&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL,
|
|
||||||
);
|
|
||||||
|
|
||||||
if result_tx.send(block_result).is_err() {
|
|
||||||
crit!(log, "Failed return sync block result");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* Verification for a chain segment (multiple blocks).
|
|
||||||
*/
|
|
||||||
Work::ChainSegment { process_id, blocks } => {
|
|
||||||
handle_chain_segment(chain, process_id, blocks, sync_tx, log)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
handler();
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
log,
|
log,
|
||||||
@@ -902,300 +659,3 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
|
||||||
/// the gossip network.
|
|
||||||
///
|
|
||||||
/// Creates a log if there is an interal error.
|
|
||||||
fn propagate_gossip_message<E: EthSpec>(
|
|
||||||
network_tx: mpsc::UnboundedSender<NetworkMessage<E>>,
|
|
||||||
message_id: MessageId,
|
|
||||||
peer_id: PeerId,
|
|
||||||
log: &Logger,
|
|
||||||
) {
|
|
||||||
network_tx
|
|
||||||
.send(NetworkMessage::Validate {
|
|
||||||
propagation_source: peer_id,
|
|
||||||
message_id,
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
warn!(
|
|
||||||
log,
|
|
||||||
"Could not send propagation request to the network service"
|
|
||||||
)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a message to `sync_tx`.
|
|
||||||
///
|
|
||||||
/// Creates a log if there is an interal error.
|
|
||||||
fn send_sync_message<E: EthSpec>(
|
|
||||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
|
||||||
message: SyncMessage<E>,
|
|
||||||
log: &Logger,
|
|
||||||
) {
|
|
||||||
sync_tx
|
|
||||||
.send(message)
|
|
||||||
.unwrap_or_else(|_| error!(log, "Could not send message to the sync service"));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
|
||||||
/// network.
|
|
||||||
pub fn handle_attestation_verification_failure<E: EthSpec>(
|
|
||||||
log: &Logger,
|
|
||||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
|
||||||
peer_id: PeerId,
|
|
||||||
beacon_block_root: Hash256,
|
|
||||||
attestation_type: &str,
|
|
||||||
error: AttnError,
|
|
||||||
) {
|
|
||||||
metrics::register_attestation_error(&error);
|
|
||||||
match &error {
|
|
||||||
AttnError::FutureEpoch { .. }
|
|
||||||
| AttnError::PastEpoch { .. }
|
|
||||||
| AttnError::FutureSlot { .. }
|
|
||||||
| AttnError::PastSlot { .. } => {
|
|
||||||
/*
|
|
||||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
|
||||||
/*
|
|
||||||
* These errors are caused by invalid signatures.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::EmptyAggregationBitfield => {
|
|
||||||
/*
|
|
||||||
* The aggregate had no signatures and is therefore worthless.
|
|
||||||
*
|
|
||||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
|
||||||
* violation of the spec nor indication of fault.
|
|
||||||
*
|
|
||||||
* This may change soon. Reference:
|
|
||||||
*
|
|
||||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
|
||||||
/*
|
|
||||||
* The aggregator index was higher than any known validator index. This is
|
|
||||||
* possible in two cases:
|
|
||||||
*
|
|
||||||
* 1. The attestation is malformed
|
|
||||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
|
||||||
*
|
|
||||||
* It should be impossible to reach (2) without triggering
|
|
||||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
|
||||||
* faulty.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::AggregatorNotInCommittee { .. } => {
|
|
||||||
/*
|
|
||||||
* The aggregator index was higher than any known validator index. This is
|
|
||||||
* possible in two cases:
|
|
||||||
*
|
|
||||||
* 1. The attestation is malformed
|
|
||||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
|
||||||
*
|
|
||||||
* It should be impossible to reach (2) without triggering
|
|
||||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
|
||||||
* faulty.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::AttestationAlreadyKnown { .. } => {
|
|
||||||
/*
|
|
||||||
* The aggregate attestation has already been observed on the network or in
|
|
||||||
* a block.
|
|
||||||
*
|
|
||||||
* The peer is not necessarily faulty.
|
|
||||||
*/
|
|
||||||
trace!(
|
|
||||||
log,
|
|
||||||
"Attestation already known";
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"block" => format!("{}", beacon_block_root),
|
|
||||||
"type" => format!("{:?}", attestation_type),
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AttnError::AggregatorAlreadyKnown(_) => {
|
|
||||||
/*
|
|
||||||
* There has already been an aggregate attestation seen from this
|
|
||||||
* aggregator index.
|
|
||||||
*
|
|
||||||
* The peer is not necessarily faulty.
|
|
||||||
*/
|
|
||||||
trace!(
|
|
||||||
log,
|
|
||||||
"Aggregator already known";
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"block" => format!("{}", beacon_block_root),
|
|
||||||
"type" => format!("{:?}", attestation_type),
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AttnError::PriorAttestationKnown { .. } => {
|
|
||||||
/*
|
|
||||||
* We have already seen an attestation from this validator for this epoch.
|
|
||||||
*
|
|
||||||
* The peer is not necessarily faulty.
|
|
||||||
*/
|
|
||||||
trace!(
|
|
||||||
log,
|
|
||||||
"Prior attestation known";
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"block" => format!("{}", beacon_block_root),
|
|
||||||
"type" => format!("{:?}", attestation_type),
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AttnError::ValidatorIndexTooHigh(_) => {
|
|
||||||
/*
|
|
||||||
* The aggregator index (or similar field) was higher than the maximum
|
|
||||||
* possible number of validators.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
|
||||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
|
||||||
// just old. See:
|
|
||||||
//
|
|
||||||
// https://github.com/sigp/lighthouse/issues/1039
|
|
||||||
|
|
||||||
// TODO: Maintain this attestation and re-process once sync completes
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Attestation for unknown block";
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"block" => format!("{}", beacon_block_root)
|
|
||||||
);
|
|
||||||
// we don't know the block, get the sync manager to handle the block lookup
|
|
||||||
sync_tx
|
|
||||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
warn!(
|
|
||||||
log,
|
|
||||||
"Failed to send to sync service";
|
|
||||||
"msg" => "UnknownBlockHash"
|
|
||||||
)
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AttnError::UnknownTargetRoot(_) => {
|
|
||||||
/*
|
|
||||||
* The block indicated by the target root is not known to us.
|
|
||||||
*
|
|
||||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
|
||||||
* error, so this means we can get this error if:
|
|
||||||
*
|
|
||||||
* 1. The target root does not represent a valid block.
|
|
||||||
* 2. We do not have the target root in our DB.
|
|
||||||
*
|
|
||||||
* For (2), we should only be processing attestations when we should have
|
|
||||||
* all the available information. Note: if we do a weak-subjectivity sync
|
|
||||||
* it's possible that this situation could occur, but I think it's
|
|
||||||
* unlikely. For now, we will declare this to be an invalid message>
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::BadTargetEpoch => {
|
|
||||||
/*
|
|
||||||
* The aggregator index (or similar field) was higher than the maximum
|
|
||||||
* possible number of validators.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
|
||||||
/*
|
|
||||||
* It is not possible to attest this the given committee in the given slot.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
|
||||||
/*
|
|
||||||
* The unaggregated attestation doesn't have only one signature.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::AttestsToFutureBlock { .. } => {
|
|
||||||
/*
|
|
||||||
* The beacon_block_root is from a higher slot than the attestation.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
AttnError::InvalidSubnetId { received, expected } => {
|
|
||||||
/*
|
|
||||||
* The attestation was received on an incorrect subnet id.
|
|
||||||
*/
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Received attestation on incorrect subnet";
|
|
||||||
"expected" => format!("{:?}", expected),
|
|
||||||
"received" => format!("{:?}", received),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
AttnError::Invalid(_) => {
|
|
||||||
/*
|
|
||||||
* The attestation failed the state_processing verification.
|
|
||||||
*
|
|
||||||
* The peer has published an invalid consensus message.
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
AttnError::TooManySkippedSlots {
|
|
||||||
head_block_slot,
|
|
||||||
attestation_slot,
|
|
||||||
} => {
|
|
||||||
/*
|
|
||||||
* The attestation references a head block that is too far behind the attestation slot.
|
|
||||||
*
|
|
||||||
* The message is not necessarily invalid, but we choose to ignore it.
|
|
||||||
*/
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Rejected long skip slot attestation";
|
|
||||||
"head_block_slot" => head_block_slot,
|
|
||||||
"attestation_slot" => attestation_slot,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
AttnError::BeaconChainError(e) => {
|
|
||||||
/*
|
|
||||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
|
||||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
|
||||||
* so we have a bug.
|
|
||||||
*
|
|
||||||
* It's not clear if the message is invalid/malicious.
|
|
||||||
*/
|
|
||||||
error!(
|
|
||||||
log,
|
|
||||||
"Unable to validate aggregate";
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"error" => format!("{:?}", e),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Invalid attestation from network";
|
|
||||||
"reason" => format!("{:?}", error),
|
|
||||||
"block" => format!("{}", beacon_block_root),
|
|
||||||
"peer_id" => peer_id.to_string(),
|
|
||||||
"type" => format!("{:?}", attestation_type),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|||||||
593
beacon_node/network/src/beacon_processor/worker.rs
Normal file
593
beacon_node/network/src/beacon_processor/worker.rs
Normal file
@@ -0,0 +1,593 @@
|
|||||||
|
use super::{
|
||||||
|
chain_segment::{handle_chain_segment, ProcessId},
|
||||||
|
BlockResultSender,
|
||||||
|
};
|
||||||
|
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||||
|
use beacon_chain::{
|
||||||
|
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||||
|
BlockError, ForkChoiceError,
|
||||||
|
};
|
||||||
|
use eth2_libp2p::{MessageId, PeerId};
|
||||||
|
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||||
|
use ssz::Encode;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use types::{Attestation, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||||
|
|
||||||
|
/// Contains the context necessary to import blocks, attestations, etc to the beacon chain.
|
||||||
|
pub struct Worker<T: BeaconChainTypes> {
|
||||||
|
pub chain: Arc<BeaconChain<T>>,
|
||||||
|
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
|
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
||||||
|
pub log: Logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: BeaconChainTypes> Worker<T> {
|
||||||
|
/// Process the unaggregated attestation received from the gossip network and:
|
||||||
|
///
|
||||||
|
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||||
|
/// - Attempt to apply it to fork choice.
|
||||||
|
/// - Attempt to add it to the naive aggregation pool.
|
||||||
|
///
|
||||||
|
/// Raises a log if there are errors.
|
||||||
|
pub fn process_gossip_attestation(
|
||||||
|
self,
|
||||||
|
message_id: MessageId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
attestation: Attestation<T::EthSpec>,
|
||||||
|
subnet_id: SubnetId,
|
||||||
|
should_import: bool,
|
||||||
|
) {
|
||||||
|
let beacon_block_root = attestation.data.beacon_block_root;
|
||||||
|
|
||||||
|
let attestation = match self
|
||||||
|
.chain
|
||||||
|
.verify_unaggregated_attestation_for_gossip(attestation, subnet_id)
|
||||||
|
{
|
||||||
|
Ok(attestation) => attestation,
|
||||||
|
Err(e) => {
|
||||||
|
self.handle_attestation_verification_failure(
|
||||||
|
peer_id,
|
||||||
|
beacon_block_root,
|
||||||
|
"unaggregated",
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Indicate to the `Network` service that this message is valid and can be
|
||||||
|
// propagated on the gossip network.
|
||||||
|
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||||
|
|
||||||
|
if !should_import {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||||
|
|
||||||
|
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) {
|
||||||
|
match e {
|
||||||
|
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Attestation invalid for fork choice";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
e => error!(
|
||||||
|
self.log,
|
||||||
|
"Error applying attestation to fork choice";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Attestation invalid for agg pool";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process the aggregated attestation received from the gossip network and:
|
||||||
|
///
|
||||||
|
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||||
|
/// - Attempt to apply it to fork choice.
|
||||||
|
/// - Attempt to add it to the block inclusion pool.
|
||||||
|
///
|
||||||
|
/// Raises a log if there are errors.
|
||||||
|
pub fn process_gossip_aggregate(
|
||||||
|
self,
|
||||||
|
message_id: MessageId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
aggregate: SignedAggregateAndProof<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
|
||||||
|
|
||||||
|
let aggregate = match self
|
||||||
|
.chain
|
||||||
|
.verify_aggregated_attestation_for_gossip(aggregate)
|
||||||
|
{
|
||||||
|
Ok(aggregate) => aggregate,
|
||||||
|
Err(e) => {
|
||||||
|
self.handle_attestation_verification_failure(
|
||||||
|
peer_id,
|
||||||
|
beacon_block_root,
|
||||||
|
"aggregated",
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Indicate to the `Network` service that this message is valid and can be
|
||||||
|
// propagated on the gossip network.
|
||||||
|
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||||
|
|
||||||
|
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) {
|
||||||
|
match e {
|
||||||
|
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Aggregate invalid for fork choice";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
e => error!(
|
||||||
|
self.log,
|
||||||
|
"Error applying aggregate to fork choice";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Attestation invalid for op pool";
|
||||||
|
"reason" => format!("{:?}", e),
|
||||||
|
"peer" => peer_id.to_string(),
|
||||||
|
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process the beacon block received from the gossip network and:
|
||||||
|
///
|
||||||
|
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||||
|
/// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to
|
||||||
|
/// be downloaded.
|
||||||
|
///
|
||||||
|
/// Raises a log if there are errors.
|
||||||
|
pub fn process_gossip_block(
|
||||||
|
self,
|
||||||
|
message_id: MessageId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
block: SignedBeaconBlock<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
let verified_block = match self.chain.verify_block_for_gossip(block) {
|
||||||
|
Ok(verified_block) => {
|
||||||
|
info!(
|
||||||
|
self.log,
|
||||||
|
"New block received";
|
||||||
|
"slot" => verified_block.block.slot(),
|
||||||
|
"hash" => verified_block.block_root.to_string()
|
||||||
|
);
|
||||||
|
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||||
|
verified_block
|
||||||
|
}
|
||||||
|
Err(BlockError::ParentUnknown(block)) => {
|
||||||
|
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Gossip block is already known";
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Could not verify block for gossip";
|
||||||
|
"error" => format!("{:?}", e)
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
|
||||||
|
|
||||||
|
let block = Box::new(verified_block.block.clone());
|
||||||
|
match self.chain.process_block(verified_block) {
|
||||||
|
Ok(_block_root) => {
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Gossipsub block processed";
|
||||||
|
"peer_id" => peer_id.to_string()
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO: It would be better if we can run this _after_ we publish the block to
|
||||||
|
// reduce block propagation latency.
|
||||||
|
//
|
||||||
|
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
||||||
|
// to have a reference to the `BeaconChain`. I will leave this for future
|
||||||
|
// works.
|
||||||
|
match self.chain.fork_choice() {
|
||||||
|
Ok(()) => trace!(
|
||||||
|
self.log,
|
||||||
|
"Fork choice success";
|
||||||
|
"location" => "block gossip"
|
||||||
|
),
|
||||||
|
Err(e) => error!(
|
||||||
|
self.log,
|
||||||
|
"Fork choice failed";
|
||||||
|
"error" => format!("{:?}", e),
|
||||||
|
"location" => "block gossip"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(BlockError::ParentUnknown { .. }) => {
|
||||||
|
// Inform the sync manager to find parents for this block
|
||||||
|
// This should not occur. It should be checked by `should_forward_block`
|
||||||
|
error!(
|
||||||
|
self.log,
|
||||||
|
"Block with unknown parent attempted to be processed";
|
||||||
|
"peer_id" => peer_id.to_string()
|
||||||
|
);
|
||||||
|
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Invalid gossip beacon block";
|
||||||
|
"outcome" => format!("{:?}", other),
|
||||||
|
"block root" => format!("{}", block.canonical_root()),
|
||||||
|
"block slot" => block.slot()
|
||||||
|
);
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Invalid gossip beacon block ssz";
|
||||||
|
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to process a block received from a direct RPC request, returning the processing
|
||||||
|
/// result on the `result_tx` channel.
|
||||||
|
///
|
||||||
|
/// Raises a log if there are errors publishing the result to the channel.
|
||||||
|
pub fn process_rpc_block(
|
||||||
|
self,
|
||||||
|
block: SignedBeaconBlock<T::EthSpec>,
|
||||||
|
result_tx: BlockResultSender<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
let block_result = self.chain.process_block(block);
|
||||||
|
|
||||||
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||||
|
|
||||||
|
if result_tx.send(block_result).is_err() {
|
||||||
|
crit!(self.log, "Failed return sync block result");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
|
||||||
|
/// thread if more blocks are needed to process it.
|
||||||
|
pub fn process_chain_segment(
|
||||||
|
self,
|
||||||
|
process_id: ProcessId,
|
||||||
|
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||||
|
) {
|
||||||
|
handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
||||||
|
/// the gossip network.
|
||||||
|
///
|
||||||
|
/// Creates a log if there is an interal error.
|
||||||
|
fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) {
|
||||||
|
self.network_tx
|
||||||
|
.send(NetworkMessage::Validate {
|
||||||
|
propagation_source: peer_id,
|
||||||
|
message_id,
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Could not send propagation request to the network service"
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a message to `sync_tx`.
|
||||||
|
///
|
||||||
|
/// Creates a log if there is an interal error.
|
||||||
|
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
|
||||||
|
self.sync_tx
|
||||||
|
.send(message)
|
||||||
|
.unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||||
|
/// network.
|
||||||
|
pub fn handle_attestation_verification_failure(
|
||||||
|
&self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
beacon_block_root: Hash256,
|
||||||
|
attestation_type: &str,
|
||||||
|
error: AttnError,
|
||||||
|
) {
|
||||||
|
metrics::register_attestation_error(&error);
|
||||||
|
match &error {
|
||||||
|
AttnError::FutureEpoch { .. }
|
||||||
|
| AttnError::PastEpoch { .. }
|
||||||
|
| AttnError::FutureSlot { .. }
|
||||||
|
| AttnError::PastSlot { .. } => {
|
||||||
|
/*
|
||||||
|
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||||
|
/*
|
||||||
|
* These errors are caused by invalid signatures.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::EmptyAggregationBitfield => {
|
||||||
|
/*
|
||||||
|
* The aggregate had no signatures and is therefore worthless.
|
||||||
|
*
|
||||||
|
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||||
|
* violation of the spec nor indication of fault.
|
||||||
|
*
|
||||||
|
* This may change soon. Reference:
|
||||||
|
*
|
||||||
|
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||||
|
/*
|
||||||
|
* The aggregator index was higher than any known validator index. This is
|
||||||
|
* possible in two cases:
|
||||||
|
*
|
||||||
|
* 1. The attestation is malformed
|
||||||
|
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||||
|
*
|
||||||
|
* It should be impossible to reach (2) without triggering
|
||||||
|
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||||
|
* faulty.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::AggregatorNotInCommittee { .. } => {
|
||||||
|
/*
|
||||||
|
* The aggregator index was higher than any known validator index. This is
|
||||||
|
* possible in two cases:
|
||||||
|
*
|
||||||
|
* 1. The attestation is malformed
|
||||||
|
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||||
|
*
|
||||||
|
* It should be impossible to reach (2) without triggering
|
||||||
|
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||||
|
* faulty.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::AttestationAlreadyKnown { .. } => {
|
||||||
|
/*
|
||||||
|
* The aggregate attestation has already been observed on the network or in
|
||||||
|
* a block.
|
||||||
|
*
|
||||||
|
* The peer is not necessarily faulty.
|
||||||
|
*/
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Attestation already known";
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"block" => format!("{}", beacon_block_root),
|
||||||
|
"type" => format!("{:?}", attestation_type),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AttnError::AggregatorAlreadyKnown(_) => {
|
||||||
|
/*
|
||||||
|
* There has already been an aggregate attestation seen from this
|
||||||
|
* aggregator index.
|
||||||
|
*
|
||||||
|
* The peer is not necessarily faulty.
|
||||||
|
*/
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Aggregator already known";
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"block" => format!("{}", beacon_block_root),
|
||||||
|
"type" => format!("{:?}", attestation_type),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AttnError::PriorAttestationKnown { .. } => {
|
||||||
|
/*
|
||||||
|
* We have already seen an attestation from this validator for this epoch.
|
||||||
|
*
|
||||||
|
* The peer is not necessarily faulty.
|
||||||
|
*/
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Prior attestation known";
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"block" => format!("{}", beacon_block_root),
|
||||||
|
"type" => format!("{:?}", attestation_type),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AttnError::ValidatorIndexTooHigh(_) => {
|
||||||
|
/*
|
||||||
|
* The aggregator index (or similar field) was higher than the maximum
|
||||||
|
* possible number of validators.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||||
|
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||||
|
// just old. See:
|
||||||
|
//
|
||||||
|
// https://github.com/sigp/lighthouse/issues/1039
|
||||||
|
|
||||||
|
// TODO: Maintain this attestation and re-process once sync completes
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Attestation for unknown block";
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"block" => format!("{}", beacon_block_root)
|
||||||
|
);
|
||||||
|
// we don't know the block, get the sync manager to handle the block lookup
|
||||||
|
self.sync_tx
|
||||||
|
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Failed to send to sync service";
|
||||||
|
"msg" => "UnknownBlockHash"
|
||||||
|
)
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AttnError::UnknownTargetRoot(_) => {
|
||||||
|
/*
|
||||||
|
* The block indicated by the target root is not known to us.
|
||||||
|
*
|
||||||
|
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||||
|
* error, so this means we can get this error if:
|
||||||
|
*
|
||||||
|
* 1. The target root does not represent a valid block.
|
||||||
|
* 2. We do not have the target root in our DB.
|
||||||
|
*
|
||||||
|
* For (2), we should only be processing attestations when we should have
|
||||||
|
* all the available information. Note: if we do a weak-subjectivity sync
|
||||||
|
* it's possible that this situation could occur, but I think it's
|
||||||
|
* unlikely. For now, we will declare this to be an invalid message>
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::BadTargetEpoch => {
|
||||||
|
/*
|
||||||
|
* The aggregator index (or similar field) was higher than the maximum
|
||||||
|
* possible number of validators.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||||
|
/*
|
||||||
|
* It is not possible to attest this the given committee in the given slot.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||||
|
/*
|
||||||
|
* The unaggregated attestation doesn't have only one signature.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::AttestsToFutureBlock { .. } => {
|
||||||
|
/*
|
||||||
|
* The beacon_block_root is from a higher slot than the attestation.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
AttnError::InvalidSubnetId { received, expected } => {
|
||||||
|
/*
|
||||||
|
* The attestation was received on an incorrect subnet id.
|
||||||
|
*/
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Received attestation on incorrect subnet";
|
||||||
|
"expected" => format!("{:?}", expected),
|
||||||
|
"received" => format!("{:?}", received),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
AttnError::Invalid(_) => {
|
||||||
|
/*
|
||||||
|
* The attestation failed the state_processing verification.
|
||||||
|
*
|
||||||
|
* The peer has published an invalid consensus message.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
AttnError::TooManySkippedSlots {
|
||||||
|
head_block_slot,
|
||||||
|
attestation_slot,
|
||||||
|
} => {
|
||||||
|
/*
|
||||||
|
* The attestation references a head block that is too far behind the attestation slot.
|
||||||
|
*
|
||||||
|
* The message is not necessarily invalid, but we choose to ignore it.
|
||||||
|
*/
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Rejected long skip slot attestation";
|
||||||
|
"head_block_slot" => head_block_slot,
|
||||||
|
"attestation_slot" => attestation_slot,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
AttnError::BeaconChainError(e) => {
|
||||||
|
/*
|
||||||
|
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||||
|
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||||
|
* so we have a bug.
|
||||||
|
*
|
||||||
|
* It's not clear if the message is invalid/malicious.
|
||||||
|
*/
|
||||||
|
error!(
|
||||||
|
self.log,
|
||||||
|
"Unable to validate aggregate";
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"error" => format!("{:?}", e),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Invalid attestation from network";
|
||||||
|
"reason" => format!("{:?}", error),
|
||||||
|
"block" => format!("{}", beacon_block_root),
|
||||||
|
"peer_id" => peer_id.to_string(),
|
||||||
|
"type" => format!("{:?}", attestation_type),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user