mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-21 22:04:44 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states
This commit is contained in:
@@ -750,6 +750,24 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BeaconProcessorSend<T: BeaconChainTypes>(pub mpsc::Sender<WorkEvent<T>>);
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconProcessorSend<T> {
|
||||
pub fn try_send(&self, message: WorkEvent<T>) -> Result<(), Box<TrySendError<WorkEvent<T>>>> {
|
||||
let work_type = message.work_type();
|
||||
match self.0.try_send(message) {
|
||||
Ok(res) => Ok(res),
|
||||
Err(e) => {
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
|
||||
&[work_type],
|
||||
);
|
||||
Err(Box::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A consensus message (or multiple) from the network that requires processing.
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
|
||||
|
||||
@@ -785,6 +785,20 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
|
||||
verified_block
|
||||
}
|
||||
Err(e @ BlockError::Slashable) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Received equivocating block from peer";
|
||||
"error" => ?e
|
||||
);
|
||||
/* punish peer for submitting an equivocation, but not too harshly as honest peers may conceivably forward equivocating blocks to us from time to time */
|
||||
self.gossip_penalize_peer(
|
||||
peer_id,
|
||||
PeerAction::MidToleranceError,
|
||||
"gossip_block_mid",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
@@ -806,7 +820,6 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
Err(e @ BlockError::FutureSlot { .. })
|
||||
| Err(e @ BlockError::WouldRevertFinalizedSlot { .. })
|
||||
| Err(e @ BlockError::BlockIsAlreadyKnown)
|
||||
| Err(e @ BlockError::RepeatProposal { .. })
|
||||
| Err(e @ BlockError::NotFinalizedDescendant { .. }) => {
|
||||
debug!(self.log, "Could not verify block for gossip. Ignoring the block";
|
||||
"error" => %e);
|
||||
@@ -835,7 +848,6 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
| Err(e @ BlockError::NonLinearParentRoots)
|
||||
| Err(e @ BlockError::BlockIsNotLaterThanParent { .. })
|
||||
| Err(e @ BlockError::InvalidSignature)
|
||||
| Err(e @ BlockError::TooManySkippedSlots { .. })
|
||||
| Err(e @ BlockError::WeakSubjectivityConflict)
|
||||
| Err(e @ BlockError::InconsistentFork(_))
|
||||
| Err(e @ BlockError::ExecutionPayloadError(_))
|
||||
@@ -949,7 +961,12 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
|
||||
let result = self
|
||||
.chain
|
||||
.process_block(block_root, verified_block, NotifyExecutionLayer::Yes)
|
||||
.process_block(
|
||||
block_root,
|
||||
verified_block,
|
||||
NotifyExecutionLayer::Yes,
|
||||
|| Ok(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
match &result {
|
||||
@@ -1736,7 +1753,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"attn_agg_not_in_committee",
|
||||
);
|
||||
}
|
||||
AttnError::AttestationAlreadyKnown { .. } => {
|
||||
AttnError::AttestationSupersetKnown { .. } => {
|
||||
/*
|
||||
* The aggregate attestation has already been observed on the network or in
|
||||
* a block.
|
||||
@@ -2245,7 +2262,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"sync_bad_aggregator",
|
||||
);
|
||||
}
|
||||
SyncCommitteeError::SyncContributionAlreadyKnown(_)
|
||||
SyncCommitteeError::SyncContributionSupersetKnown(_)
|
||||
| SyncCommitteeError::AggregatorAlreadyKnown(_) => {
|
||||
/*
|
||||
* The sync committee message already been observed on the network or in
|
||||
|
||||
@@ -98,33 +98,21 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
});
|
||||
|
||||
// Checks if a block from this proposer is already known.
|
||||
let proposal_already_known = || {
|
||||
let block_equivocates = || {
|
||||
match self
|
||||
.chain
|
||||
.observed_block_producers
|
||||
.read()
|
||||
.proposer_has_been_observed(block.message())
|
||||
.proposer_has_been_observed(block.message(), block.canonical_root())
|
||||
{
|
||||
Ok(is_observed) => is_observed,
|
||||
// Both of these blocks will be rejected, so reject them now rather
|
||||
Ok(seen_status) => seen_status.is_slashable(),
|
||||
//Both of these blocks will be rejected, so reject them now rather
|
||||
// than re-queuing them.
|
||||
Err(ObserveError::FinalizedBlock { .. })
|
||||
| Err(ObserveError::ValidatorIndexTooHigh { .. }) => false,
|
||||
}
|
||||
};
|
||||
|
||||
// Returns `true` if the block is already known to fork choice. Notably,
|
||||
// this will return `false` for blocks that we've already imported but
|
||||
// ancestors of the finalized checkpoint. That should not be an issue
|
||||
// for our use here since finalized blocks will always be late and won't
|
||||
// be requeued anyway.
|
||||
let block_is_already_known = || {
|
||||
self.chain
|
||||
.canonical_head
|
||||
.fork_choice_read_lock()
|
||||
.contains_block(&block_root)
|
||||
};
|
||||
|
||||
// If we've already seen a block from this proposer *and* the block
|
||||
// arrived before the attestation deadline, requeue it to ensure it is
|
||||
// imported late enough that it won't receive a proposer boost.
|
||||
@@ -132,7 +120,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
// Don't requeue blocks if they're already known to fork choice, just
|
||||
// push them through to block processing so they can be handled through
|
||||
// the normal channels.
|
||||
if !block_is_late && proposal_already_known() && !block_is_already_known() {
|
||||
if !block_is_late && block_equivocates() {
|
||||
debug!(
|
||||
self.log,
|
||||
"Delaying processing of duplicate RPC block";
|
||||
@@ -165,7 +153,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
let parent_root = block.message().parent_root();
|
||||
let result = self
|
||||
.chain
|
||||
.process_block(block_root, block, NotifyExecutionLayer::Yes)
|
||||
.process_block(block_root, block, NotifyExecutionLayer::Yes, || Ok(()))
|
||||
.await;
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
@@ -279,6 +279,12 @@ lazy_static! {
|
||||
"Gossipsub light_client_optimistic_update errors per error type",
|
||||
&["type"]
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result<IntCounterVec> =
|
||||
try_create_int_counter_vec(
|
||||
"beacon_processor_send_error_per_work_type",
|
||||
"Total number of beacon processor send error per work type",
|
||||
&["type"]
|
||||
);
|
||||
|
||||
|
||||
/*
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
#![allow(clippy::unit_arg)]
|
||||
|
||||
use crate::beacon_processor::{
|
||||
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
|
||||
BeaconProcessor, BeaconProcessorSend, InvalidBlockStorage, WorkEvent as BeaconWorkEvent,
|
||||
MAX_WORK_EVENT_QUEUE_LEN,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
@@ -19,6 +20,7 @@ use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
|
||||
};
|
||||
use logging::TimeLatch;
|
||||
use slog::{debug, o, trace};
|
||||
use slog::{error, warn};
|
||||
use std::cmp;
|
||||
@@ -39,9 +41,11 @@ pub struct Router<T: BeaconChainTypes> {
|
||||
/// A network context to return and handle RPC requests.
|
||||
network: HandlerNetworkContext<T::EthSpec>,
|
||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
||||
beacon_processor_send: BeaconProcessorSend<T>,
|
||||
/// The `Router` logger.
|
||||
log: slog::Logger,
|
||||
/// Provides de-bounce functionality for logging.
|
||||
logger_debounce: TimeLatch,
|
||||
}
|
||||
|
||||
/// Types of messages the router can receive.
|
||||
@@ -100,7 +104,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
beacon_chain.clone(),
|
||||
network_globals.clone(),
|
||||
network_send.clone(),
|
||||
beacon_processor_send.clone(),
|
||||
BeaconProcessorSend(beacon_processor_send.clone()),
|
||||
sync_logger,
|
||||
);
|
||||
|
||||
@@ -124,8 +128,9 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
chain: beacon_chain,
|
||||
sync_send,
|
||||
network: HandlerNetworkContext::new(network_send, log.clone()),
|
||||
beacon_processor_send,
|
||||
beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
|
||||
log: message_handler_log,
|
||||
logger_debounce: TimeLatch::default(),
|
||||
};
|
||||
|
||||
// spawn handler task and move the message handler instance into the spawned thread
|
||||
@@ -479,12 +484,15 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
self.beacon_processor_send
|
||||
.try_send(work)
|
||||
.unwrap_or_else(|e| {
|
||||
let work_type = match &e {
|
||||
let work_type = match &*e {
|
||||
mpsc::error::TrySendError::Closed(work)
|
||||
| mpsc::error::TrySendError::Full(work) => work.work_type(),
|
||||
};
|
||||
error!(&self.log, "Unable to send message to the beacon processor";
|
||||
"error" => %e, "type" => work_type)
|
||||
|
||||
if self.logger_debounce.elapsed() {
|
||||
error!(&self.log, "Unable to send message to the beacon processor";
|
||||
"error" => %e, "type" => work_type)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::beacon_processor::BeaconProcessorSend;
|
||||
use crate::service::RequestId;
|
||||
use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::NetworkMessage;
|
||||
@@ -54,7 +55,7 @@ impl TestRig {
|
||||
SyncNetworkContext::new(
|
||||
network_tx,
|
||||
globals,
|
||||
beacon_processor_tx,
|
||||
BeaconProcessorSend(beacon_processor_tx),
|
||||
log.new(slog::o!("component" => "network_context")),
|
||||
)
|
||||
};
|
||||
|
||||
@@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups;
|
||||
use super::network_context::SyncNetworkContext;
|
||||
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
||||
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
||||
use crate::beacon_processor::{BeaconProcessorSend, ChainSegmentProcessId};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
|
||||
@@ -188,7 +188,7 @@ pub fn spawn<T: BeaconChainTypes>(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
||||
beacon_processor_send: BeaconProcessorSend<T>,
|
||||
log: slog::Logger,
|
||||
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
|
||||
assert!(
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use super::manager::{Id, RequestId as SyncRequestId};
|
||||
use super::range_sync::{BatchId, ChainId};
|
||||
use crate::beacon_processor::WorkEvent;
|
||||
use crate::beacon_processor::BeaconProcessorSend;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::status::ToStatusMessage;
|
||||
use beacon_chain::{BeaconChainTypes, EngineState};
|
||||
@@ -37,7 +37,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||
execution_engine_state: EngineState,
|
||||
|
||||
/// Channel to send work to the beacon processor.
|
||||
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
|
||||
beacon_processor_send: BeaconProcessorSend<T>,
|
||||
|
||||
/// Logger for the `SyncNetworkContext`.
|
||||
log: slog::Logger,
|
||||
@@ -47,7 +47,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
pub fn new(
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
|
||||
beacon_processor_send: BeaconProcessorSend<T>,
|
||||
log: slog::Logger,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -278,12 +278,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender<WorkEvent<T>>> {
|
||||
pub fn processor_channel_if_enabled(&self) -> Option<&BeaconProcessorSend<T>> {
|
||||
self.is_execution_engine_online()
|
||||
.then_some(&self.beacon_processor_send)
|
||||
}
|
||||
|
||||
pub fn processor_channel(&self) -> &mpsc::Sender<WorkEvent<T>> {
|
||||
pub fn processor_channel(&self) -> &BeaconProcessorSend<T> {
|
||||
&self.beacon_processor_send
|
||||
}
|
||||
|
||||
|
||||
@@ -375,7 +375,7 @@ mod tests {
|
||||
use crate::NetworkMessage;
|
||||
|
||||
use super::*;
|
||||
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use crate::beacon_processor::{BeaconProcessorSend, WorkEvent as BeaconWorkEvent};
|
||||
use beacon_chain::builder::Witness;
|
||||
use beacon_chain::eth1_chain::CachingEth1Backend;
|
||||
use beacon_chain::parking_lot::RwLock;
|
||||
@@ -603,7 +603,7 @@ mod tests {
|
||||
let cx = SyncNetworkContext::new(
|
||||
network_tx,
|
||||
globals.clone(),
|
||||
beacon_processor_tx,
|
||||
BeaconProcessorSend(beacon_processor_tx),
|
||||
log.new(o!("component" => "network_context")),
|
||||
);
|
||||
let test_rig = TestRig {
|
||||
|
||||
Reference in New Issue
Block a user