mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 04:31:51 +00:00
Refactor into the new Worker struct
This commit is contained in:
@@ -36,22 +36,19 @@
|
||||
//! task.
|
||||
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
use beacon_chain::{
|
||||
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
BlockError, ForkChoiceError,
|
||||
};
|
||||
use chain_segment::handle_chain_segment;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
|
||||
use environment::TaskExecutor;
|
||||
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
|
||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use ssz::Encode;
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||
use worker::Worker;
|
||||
|
||||
mod chain_segment;
|
||||
mod worker;
|
||||
|
||||
pub use chain_segment::ProcessId;
|
||||
|
||||
@@ -365,7 +362,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
|
||||
let executor = self.executor.clone();
|
||||
|
||||
// The manager future will run on the non-blocking executor and delegate tasks to worker
|
||||
// The manager future will run on the core executor and delegate tasks to worker
|
||||
// threads on the blocking executor.
|
||||
let manager_future = async move {
|
||||
loop {
|
||||
@@ -544,7 +541,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn on the non-blocking executor.
|
||||
// Spawn on the core executor.
|
||||
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
||||
}
|
||||
|
||||
@@ -574,11 +571,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
return;
|
||||
};
|
||||
|
||||
let network_tx = self.network_tx.clone();
|
||||
let sync_tx = self.sync_tx.clone();
|
||||
let log = self.log.clone();
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let worker = Worker {
|
||||
chain: chain.clone(),
|
||||
network_tx: self.network_tx.clone(),
|
||||
sync_tx: self.sync_tx.clone(),
|
||||
log: self.log.clone(),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Spawning beacon processor worker";
|
||||
@@ -589,298 +591,53 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
executor.spawn_blocking(
|
||||
move || {
|
||||
let _worker_timer = worker_timer;
|
||||
let inner_log = log.clone();
|
||||
|
||||
// We use this closure pattern to avoid using a `return` that prevents the
|
||||
// `idle_tx` message from sending.
|
||||
let handler = || {
|
||||
let log = inner_log.clone();
|
||||
match work {
|
||||
/*
|
||||
* Unaggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAttestation {
|
||||
message_id,
|
||||
peer_id,
|
||||
attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
} => {
|
||||
let beacon_block_root = attestation.data.beacon_block_root;
|
||||
|
||||
let attestation = match chain
|
||||
.verify_unaggregated_attestation_for_gossip(*attestation, subnet_id)
|
||||
{
|
||||
Ok(attestation) => attestation,
|
||||
Err(e) => {
|
||||
handle_attestation_verification_failure(
|
||||
&log,
|
||||
sync_tx,
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"unaggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
||||
|
||||
if !should_import {
|
||||
return;
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(
|
||||
ForkChoiceError::InvalidAttestation(e),
|
||||
) => debug!(
|
||||
log,
|
||||
"Attestation invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
e => error!(
|
||||
log,
|
||||
"Error applying attestation to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) {
|
||||
debug!(
|
||||
log,
|
||||
"Attestation invalid for agg pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
||||
);
|
||||
}
|
||||
/*
|
||||
* Aggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAggregate {
|
||||
message_id,
|
||||
peer_id,
|
||||
aggregate,
|
||||
} => {
|
||||
let beacon_block_root =
|
||||
aggregate.message.aggregate.data.beacon_block_root;
|
||||
|
||||
let aggregate =
|
||||
match chain.verify_aggregated_attestation_for_gossip(*aggregate) {
|
||||
Ok(aggregate) => aggregate,
|
||||
Err(e) => {
|
||||
handle_attestation_verification_failure(
|
||||
&log,
|
||||
sync_tx,
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"aggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(
|
||||
ForkChoiceError::InvalidAttestation(e),
|
||||
) => debug!(
|
||||
log,
|
||||
"Aggregate invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
e => error!(
|
||||
log,
|
||||
"Error applying aggregate to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) {
|
||||
debug!(
|
||||
log,
|
||||
"Attestation invalid for op pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
|
||||
);
|
||||
}
|
||||
/*
|
||||
* Verification for beacon blocks received on gossip.
|
||||
*/
|
||||
Work::GossipBlock {
|
||||
message_id,
|
||||
peer_id,
|
||||
block,
|
||||
} => {
|
||||
let verified_block = match chain.verify_block_for_gossip(*block) {
|
||||
Ok(verified_block) => {
|
||||
info!(
|
||||
log,
|
||||
"New block received";
|
||||
"slot" => verified_block.block.slot(),
|
||||
"hash" => verified_block.block_root.to_string()
|
||||
);
|
||||
propagate_gossip_message(
|
||||
network_tx,
|
||||
message_id,
|
||||
peer_id.clone(),
|
||||
&log,
|
||||
);
|
||||
verified_block
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
send_sync_message(
|
||||
sync_tx,
|
||||
SyncMessage::UnknownBlock(peer_id, block),
|
||||
&log,
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||
debug!(
|
||||
log,
|
||||
"Gossip block is already known";
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
log,
|
||||
"Could not verify block for gossip";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL,
|
||||
);
|
||||
|
||||
let block = Box::new(verified_block.block.clone());
|
||||
match chain.process_block(verified_block) {
|
||||
Ok(_block_root) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
|
||||
);
|
||||
|
||||
trace!(
|
||||
log,
|
||||
"Gossipsub block processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
|
||||
// TODO: It would be better if we can run this _after_ we publish the block to
|
||||
// reduce block propagation latency.
|
||||
//
|
||||
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
||||
// to have a reference to the `BeaconChain`. I will leave this for future
|
||||
// works.
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"location" => "block gossip"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "block gossip"
|
||||
),
|
||||
}
|
||||
}
|
||||
Err(BlockError::ParentUnknown { .. }) => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
// This should not occur. It should be checked by `should_forward_block`
|
||||
error!(
|
||||
log,
|
||||
"Block with unknown parent attempted to be processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
send_sync_message(
|
||||
sync_tx,
|
||||
SyncMessage::UnknownBlock(peer_id, block),
|
||||
&log,
|
||||
);
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
log,
|
||||
"Invalid gossip beacon block";
|
||||
"outcome" => format!("{:?}", other),
|
||||
"block root" => format!("{}", block.canonical_root()),
|
||||
"block slot" => block.slot()
|
||||
);
|
||||
trace!(
|
||||
log,
|
||||
"Invalid gossip beacon block ssz";
|
||||
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
/*
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock { block, result_tx } => {
|
||||
let block_result = chain.process_block(*block);
|
||||
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL,
|
||||
);
|
||||
|
||||
if result_tx.send(block_result).is_err() {
|
||||
crit!(log, "Failed return sync block result");
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Verification for a chain segment (multiple blocks).
|
||||
*/
|
||||
Work::ChainSegment { process_id, blocks } => {
|
||||
handle_chain_segment(chain, process_id, blocks, sync_tx, log)
|
||||
}
|
||||
};
|
||||
match work {
|
||||
/*
|
||||
* Unaggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAttestation {
|
||||
message_id,
|
||||
peer_id,
|
||||
attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
} => worker.process_gossip_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
*attestation,
|
||||
subnet_id,
|
||||
should_import,
|
||||
),
|
||||
/*
|
||||
* Aggregated attestation verification.
|
||||
*/
|
||||
Work::GossipAggregate {
|
||||
message_id,
|
||||
peer_id,
|
||||
aggregate,
|
||||
} => worker.process_gossip_aggregate(message_id, peer_id, *aggregate),
|
||||
/*
|
||||
* Verification for beacon blocks received on gossip.
|
||||
*/
|
||||
Work::GossipBlock {
|
||||
message_id,
|
||||
peer_id,
|
||||
block,
|
||||
} => worker.process_gossip_block(message_id, peer_id, *block),
|
||||
/*
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock { block, result_tx } => {
|
||||
worker.process_rpc_block(*block, result_tx)
|
||||
}
|
||||
/*
|
||||
* Verification for a chain segment (multiple blocks).
|
||||
*/
|
||||
Work::ChainSegment { process_id, blocks } => {
|
||||
worker.process_chain_segment(process_id, blocks)
|
||||
}
|
||||
};
|
||||
handler();
|
||||
|
||||
trace!(
|
||||
log,
|
||||
@@ -902,300 +659,3 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
||||
/// the gossip network.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn propagate_gossip_message<E: EthSpec>(
|
||||
network_tx: mpsc::UnboundedSender<NetworkMessage<E>>,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
log: &Logger,
|
||||
) {
|
||||
network_tx
|
||||
.send(NetworkMessage::Validate {
|
||||
propagation_source: peer_id,
|
||||
message_id,
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
log,
|
||||
"Could not send propagation request to the network service"
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// Send a message to `sync_tx`.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn send_sync_message<E: EthSpec>(
|
||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
||||
message: SyncMessage<E>,
|
||||
log: &Logger,
|
||||
) {
|
||||
sync_tx
|
||||
.send(message)
|
||||
.unwrap_or_else(|_| error!(log, "Could not send message to the sync service"));
|
||||
}
|
||||
|
||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||
/// network.
|
||||
pub fn handle_attestation_verification_failure<E: EthSpec>(
|
||||
log: &Logger,
|
||||
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
|
||||
peer_id: PeerId,
|
||||
beacon_block_root: Hash256,
|
||||
attestation_type: &str,
|
||||
error: AttnError,
|
||||
) {
|
||||
metrics::register_attestation_error(&error);
|
||||
match &error {
|
||||
AttnError::FutureEpoch { .. }
|
||||
| AttnError::PastEpoch { .. }
|
||||
| AttnError::FutureSlot { .. }
|
||||
| AttnError::PastSlot { .. } => {
|
||||
/*
|
||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||
*
|
||||
*
|
||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||
*/
|
||||
}
|
||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||
/*
|
||||
* These errors are caused by invalid signatures.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::EmptyAggregationBitfield => {
|
||||
/*
|
||||
* The aggregate had no signatures and is therefore worthless.
|
||||
*
|
||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||
* violation of the spec nor indication of fault.
|
||||
*
|
||||
* This may change soon. Reference:
|
||||
*
|
||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorNotInCommittee { .. } => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Attestation already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* There has already been an aggregate attestation seen from this
|
||||
* aggregator index.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Aggregator already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::PriorAttestationKnown { .. } => {
|
||||
/*
|
||||
* We have already seen an attestation from this validator for this epoch.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
log,
|
||||
"Prior attestation known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::ValidatorIndexTooHigh(_) => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||
// just old. See:
|
||||
//
|
||||
// https://github.com/sigp/lighthouse/issues/1039
|
||||
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root)
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
sync_tx
|
||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
log,
|
||||
"Failed to send to sync service";
|
||||
"msg" => "UnknownBlockHash"
|
||||
)
|
||||
});
|
||||
return;
|
||||
}
|
||||
AttnError::UnknownTargetRoot(_) => {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
* 2. We do not have the target root in our DB.
|
||||
*
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BadTargetEpoch => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||
/*
|
||||
* It is not possible to attest this the given committee in the given slot.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||
/*
|
||||
* The unaggregated attestation doesn't have only one signature.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestsToFutureBlock { .. } => {
|
||||
/*
|
||||
* The beacon_block_root is from a higher slot than the attestation.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
|
||||
AttnError::InvalidSubnetId { received, expected } => {
|
||||
/*
|
||||
* The attestation was received on an incorrect subnet id.
|
||||
*/
|
||||
debug!(
|
||||
log,
|
||||
"Received attestation on incorrect subnet";
|
||||
"expected" => format!("{:?}", expected),
|
||||
"received" => format!("{:?}", received),
|
||||
)
|
||||
}
|
||||
AttnError::Invalid(_) => {
|
||||
/*
|
||||
* The attestation failed the state_processing verification.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::TooManySkippedSlots {
|
||||
head_block_slot,
|
||||
attestation_slot,
|
||||
} => {
|
||||
/*
|
||||
* The attestation references a head block that is too far behind the attestation slot.
|
||||
*
|
||||
* The message is not necessarily invalid, but we choose to ignore it.
|
||||
*/
|
||||
debug!(
|
||||
log,
|
||||
"Rejected long skip slot attestation";
|
||||
"head_block_slot" => head_block_slot,
|
||||
"attestation_slot" => attestation_slot,
|
||||
)
|
||||
}
|
||||
AttnError::BeaconChainError(e) => {
|
||||
/*
|
||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||
* so we have a bug.
|
||||
*
|
||||
* It's not clear if the message is invalid/malicious.
|
||||
*/
|
||||
error!(
|
||||
log,
|
||||
"Unable to validate aggregate";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
log,
|
||||
"Invalid attestation from network";
|
||||
"reason" => format!("{:?}", error),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
}
|
||||
|
||||
593
beacon_node/network/src/beacon_processor/worker.rs
Normal file
593
beacon_node/network/src/beacon_processor/worker.rs
Normal file
@@ -0,0 +1,593 @@
|
||||
use super::{
|
||||
chain_segment::{handle_chain_segment, ProcessId},
|
||||
BlockResultSender,
|
||||
};
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
use beacon_chain::{
|
||||
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
BlockError, ForkChoiceError,
|
||||
};
|
||||
use eth2_libp2p::{MessageId, PeerId};
|
||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use ssz::Encode;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{Attestation, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||
|
||||
/// Contains the context necessary to import blocks, attestations, etc to the beacon chain.
|
||||
pub struct Worker<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Worker<T> {
|
||||
/// Process the unaggregated attestation received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to apply it to fork choice.
|
||||
/// - Attempt to add it to the naive aggregation pool.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_attestation(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attestation: Attestation<T::EthSpec>,
|
||||
subnet_id: SubnetId,
|
||||
should_import: bool,
|
||||
) {
|
||||
let beacon_block_root = attestation.data.beacon_block_root;
|
||||
|
||||
let attestation = match self
|
||||
.chain
|
||||
.verify_unaggregated_attestation_for_gossip(attestation, subnet_id)
|
||||
{
|
||||
Ok(attestation) => attestation,
|
||||
Err(e) => {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"unaggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
|
||||
if !should_import {
|
||||
return;
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||
|
||||
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
e => error!(
|
||||
self.log,
|
||||
"Error applying attestation to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for agg pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
/// Process the aggregated attestation received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to apply it to fork choice.
|
||||
/// - Attempt to add it to the block inclusion pool.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_aggregate(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
aggregate: SignedAggregateAndProof<T::EthSpec>,
|
||||
) {
|
||||
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
|
||||
|
||||
let aggregate = match self
|
||||
.chain
|
||||
.verify_aggregated_attestation_for_gossip(aggregate)
|
||||
{
|
||||
Ok(aggregate) => aggregate,
|
||||
Err(e) => {
|
||||
self.handle_attestation_verification_failure(
|
||||
peer_id,
|
||||
beacon_block_root,
|
||||
"aggregated",
|
||||
e,
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the `Network` service that this message is valid and can be
|
||||
// propagated on the gossip network.
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
|
||||
|
||||
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) {
|
||||
match e {
|
||||
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Aggregate invalid for fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
e => error!(
|
||||
self.log,
|
||||
"Error applying aggregate to fork choice";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation invalid for op pool";
|
||||
"reason" => format!("{:?}", e),
|
||||
"peer" => peer_id.to_string(),
|
||||
"beacon_block_root" => format!("{:?}", beacon_block_root)
|
||||
)
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL);
|
||||
}
|
||||
|
||||
/// Process the beacon block received from the gossip network and:
|
||||
///
|
||||
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
/// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to
|
||||
/// be downloaded.
|
||||
///
|
||||
/// Raises a log if there are errors.
|
||||
pub fn process_gossip_block(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
block: SignedBeaconBlock<T::EthSpec>,
|
||||
) {
|
||||
let verified_block = match self.chain.verify_block_for_gossip(block) {
|
||||
Ok(verified_block) => {
|
||||
info!(
|
||||
self.log,
|
||||
"New block received";
|
||||
"slot" => verified_block.block.slot(),
|
||||
"hash" => verified_block.block_root.to_string()
|
||||
);
|
||||
self.propagate_gossip_message(message_id, peer_id.clone());
|
||||
verified_block
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
return;
|
||||
}
|
||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Gossip block is already known";
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not verify block for gossip";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
|
||||
|
||||
let block = Box::new(verified_block.block.clone());
|
||||
match self.chain.process_block(verified_block) {
|
||||
Ok(_block_root) => {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Gossipsub block processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
|
||||
// TODO: It would be better if we can run this _after_ we publish the block to
|
||||
// reduce block propagation latency.
|
||||
//
|
||||
// The `MessageHandler` would be the place to put this, however it doesn't seem
|
||||
// to have a reference to the `BeaconChain`. I will leave this for future
|
||||
// works.
|
||||
match self.chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
self.log,
|
||||
"Fork choice success";
|
||||
"location" => "block gossip"
|
||||
),
|
||||
Err(e) => error!(
|
||||
self.log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "block gossip"
|
||||
),
|
||||
}
|
||||
}
|
||||
Err(BlockError::ParentUnknown { .. }) => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
// This should not occur. It should be checked by `should_forward_block`
|
||||
error!(
|
||||
self.log,
|
||||
"Block with unknown parent attempted to be processed";
|
||||
"peer_id" => peer_id.to_string()
|
||||
);
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Invalid gossip beacon block";
|
||||
"outcome" => format!("{:?}", other),
|
||||
"block root" => format!("{}", block.canonical_root()),
|
||||
"block slot" => block.slot()
|
||||
);
|
||||
trace!(
|
||||
self.log,
|
||||
"Invalid gossip beacon block ssz";
|
||||
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Attempt to process a block received from a direct RPC request, returning the processing
|
||||
/// result on the `result_tx` channel.
|
||||
///
|
||||
/// Raises a log if there are errors publishing the result to the channel.
|
||||
pub fn process_rpc_block(
|
||||
self,
|
||||
block: SignedBeaconBlock<T::EthSpec>,
|
||||
result_tx: BlockResultSender<T::EthSpec>,
|
||||
) {
|
||||
let block_result = self.chain.process_block(block);
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
if result_tx.send(block_result).is_err() {
|
||||
crit!(self.log, "Failed return sync block result");
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
|
||||
/// thread if more blocks are needed to process it.
|
||||
pub fn process_chain_segment(
|
||||
self,
|
||||
process_id: ProcessId,
|
||||
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log)
|
||||
}
|
||||
|
||||
/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
|
||||
/// the gossip network.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) {
|
||||
self.network_tx
|
||||
.send(NetworkMessage::Validate {
|
||||
propagation_source: peer_id,
|
||||
message_id,
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not send propagation request to the network service"
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// Send a message to `sync_tx`.
|
||||
///
|
||||
/// Creates a log if there is an interal error.
|
||||
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
|
||||
self.sync_tx
|
||||
.send(message)
|
||||
.unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service"));
|
||||
}
|
||||
|
||||
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
|
||||
/// network.
|
||||
pub fn handle_attestation_verification_failure(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
beacon_block_root: Hash256,
|
||||
attestation_type: &str,
|
||||
error: AttnError,
|
||||
) {
|
||||
metrics::register_attestation_error(&error);
|
||||
match &error {
|
||||
AttnError::FutureEpoch { .. }
|
||||
| AttnError::PastEpoch { .. }
|
||||
| AttnError::FutureSlot { .. }
|
||||
| AttnError::PastSlot { .. } => {
|
||||
/*
|
||||
* These errors can be triggered by a mismatch between our slot and the peer.
|
||||
*
|
||||
*
|
||||
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
|
||||
*/
|
||||
}
|
||||
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
|
||||
/*
|
||||
* These errors are caused by invalid signatures.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::EmptyAggregationBitfield => {
|
||||
/*
|
||||
* The aggregate had no signatures and is therefore worthless.
|
||||
*
|
||||
* Whilst we don't gossip this attestation, this act is **not** a clear
|
||||
* violation of the spec nor indication of fault.
|
||||
*
|
||||
* This may change soon. Reference:
|
||||
*
|
||||
* https://github.com/ethereum/eth2.0-specs/pull/1732
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorPubkeyUnknown(_) => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AggregatorNotInCommittee { .. } => {
|
||||
/*
|
||||
* The aggregator index was higher than any known validator index. This is
|
||||
* possible in two cases:
|
||||
*
|
||||
* 1. The attestation is malformed
|
||||
* 2. The attestation attests to a beacon_block_root that we do not know.
|
||||
*
|
||||
* It should be impossible to reach (2) without triggering
|
||||
* `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
|
||||
* faulty.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Attestation already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* There has already been an aggregate attestation seen from this
|
||||
* aggregator index.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Aggregator already known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::PriorAttestationKnown { .. } => {
|
||||
/*
|
||||
* We have already seen an attestation from this validator for this epoch.
|
||||
*
|
||||
* The peer is not necessarily faulty.
|
||||
*/
|
||||
trace!(
|
||||
self.log,
|
||||
"Prior attestation known";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
return;
|
||||
}
|
||||
AttnError::ValidatorIndexTooHigh(_) => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::UnknownHeadBlock { beacon_block_root } => {
|
||||
// Note: its a little bit unclear as to whether or not this block is unknown or
|
||||
// just old. See:
|
||||
//
|
||||
// https://github.com/sigp/lighthouse/issues/1039
|
||||
|
||||
// TODO: Maintain this attestation and re-process once sync completes
|
||||
debug!(
|
||||
self.log,
|
||||
"Attestation for unknown block";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"block" => format!("{}", beacon_block_root)
|
||||
);
|
||||
// we don't know the block, get the sync manager to handle the block lookup
|
||||
self.sync_tx
|
||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Failed to send to sync service";
|
||||
"msg" => "UnknownBlockHash"
|
||||
)
|
||||
});
|
||||
return;
|
||||
}
|
||||
AttnError::UnknownTargetRoot(_) => {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
* 2. We do not have the target root in our DB.
|
||||
*
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::BadTargetEpoch => {
|
||||
/*
|
||||
* The aggregator index (or similar field) was higher than the maximum
|
||||
* possible number of validators.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NoCommitteeForSlotAndIndex { .. } => {
|
||||
/*
|
||||
* It is not possible to attest this the given committee in the given slot.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::NotExactlyOneAggregationBitSet(_) => {
|
||||
/*
|
||||
* The unaggregated attestation doesn't have only one signature.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::AttestsToFutureBlock { .. } => {
|
||||
/*
|
||||
* The beacon_block_root is from a higher slot than the attestation.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
|
||||
AttnError::InvalidSubnetId { received, expected } => {
|
||||
/*
|
||||
* The attestation was received on an incorrect subnet id.
|
||||
*/
|
||||
debug!(
|
||||
self.log,
|
||||
"Received attestation on incorrect subnet";
|
||||
"expected" => format!("{:?}", expected),
|
||||
"received" => format!("{:?}", received),
|
||||
)
|
||||
}
|
||||
AttnError::Invalid(_) => {
|
||||
/*
|
||||
* The attestation failed the state_processing verification.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
}
|
||||
AttnError::TooManySkippedSlots {
|
||||
head_block_slot,
|
||||
attestation_slot,
|
||||
} => {
|
||||
/*
|
||||
* The attestation references a head block that is too far behind the attestation slot.
|
||||
*
|
||||
* The message is not necessarily invalid, but we choose to ignore it.
|
||||
*/
|
||||
debug!(
|
||||
self.log,
|
||||
"Rejected long skip slot attestation";
|
||||
"head_block_slot" => head_block_slot,
|
||||
"attestation_slot" => attestation_slot,
|
||||
)
|
||||
}
|
||||
AttnError::BeaconChainError(e) => {
|
||||
/*
|
||||
* Lighthouse hit an unexpected error whilst processing the attestation. It
|
||||
* should be impossible to trigger a `BeaconChainError` from the network,
|
||||
* so we have a bug.
|
||||
*
|
||||
* It's not clear if the message is invalid/malicious.
|
||||
*/
|
||||
error!(
|
||||
self.log,
|
||||
"Unable to validate aggregate";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Invalid attestation from network";
|
||||
"reason" => format!("{:?}", error),
|
||||
"block" => format!("{}", beacon_block_root),
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"type" => format!("{:?}", attestation_type),
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user