Merge branch 'remove-into-gossip-verified-block' of https://github.com/realbigsean/lighthouse into merge-unstable-deneb-june-6th

This commit is contained in:
realbigsean
2023-07-06 16:51:35 -04:00
78 changed files with 3075 additions and 407 deletions

View File

@@ -836,6 +836,24 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
}
}
pub struct BeaconProcessorSend<T: BeaconChainTypes>(pub mpsc::Sender<WorkEvent<T>>);
impl<T: BeaconChainTypes> BeaconProcessorSend<T> {
pub fn try_send(&self, message: WorkEvent<T>) -> Result<(), Box<TrySendError<WorkEvent<T>>>> {
let work_type = message.work_type();
match self.0.try_send(message) {
Ok(res) => Ok(res),
Err(e) => {
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
&[work_type],
);
Err(Box::new(e))
}
}
}
}
/// A consensus message (or multiple) from the network that requires processing.
#[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))]

View File

@@ -929,6 +929,20 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block
}
Err(e @ BlockError::Slashable) => {
warn!(
self.log,
"Received equivocating block from peer";
"error" => ?e
);
/* punish peer for submitting an equivocation, but not too harshly as honest peers may conceivably forward equivocating blocks to us from time to time */
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"gossip_block_mid",
);
return None;
}
Err(BlockError::ParentUnknown(block)) => {
debug!(
self.log,
@@ -950,7 +964,6 @@ impl<T: BeaconChainTypes> Worker<T> {
Err(e @ BlockError::FutureSlot { .. })
| Err(e @ BlockError::WouldRevertFinalizedSlot { .. })
| Err(e @ BlockError::BlockIsAlreadyKnown)
| Err(e @ BlockError::RepeatProposal { .. })
| Err(e @ BlockError::NotFinalizedDescendant { .. }) => {
debug!(self.log, "Could not verify block for gossip. Ignoring the block";
"error" => %e);
@@ -1103,7 +1116,12 @@ impl<T: BeaconChainTypes> Worker<T> {
let result = self
.chain
.process_block(block_root, verified_block, NotifyExecutionLayer::Yes)
.process_block(
block_root,
verified_block,
NotifyExecutionLayer::Yes,
|| Ok(()),
)
.await;
match &result {

View File

@@ -103,33 +103,21 @@ impl<T: BeaconChainTypes> Worker<T> {
});
// Checks if a block from this proposer is already known.
let proposal_already_known = || {
let block_equivocates = || {
match self
.chain
.observed_block_producers
.read()
.proposer_has_been_observed(block.message())
.proposer_has_been_observed(block.message(), block.canonical_root())
{
Ok(is_observed) => is_observed,
// Both of these blocks will be rejected, so reject them now rather
Ok(seen_status) => seen_status.is_slashable(),
//Both of these blocks will be rejected, so reject them now rather
// than re-queuing them.
Err(ObserveError::FinalizedBlock { .. })
| Err(ObserveError::ValidatorIndexTooHigh { .. }) => false,
}
};
// Returns `true` if the block is already known to fork choice. Notably,
// this will return `false` for blocks that we've already imported but
// ancestors of the finalized checkpoint. That should not be an issue
// for our use here since finalized blocks will always be late and won't
// be requeued anyway.
let block_is_already_known = || {
self.chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
};
// If we've already seen a block from this proposer *and* the block
// arrived before the attestation deadline, requeue it to ensure it is
// imported late enough that it won't receive a proposer boost.
@@ -137,7 +125,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Don't requeue blocks if they're already known to fork choice, just
// push them through to block processing so they can be handled through
// the normal channels.
if !block_is_late && proposal_already_known() && !block_is_already_known() {
if !block_is_late && block_equivocates() {
debug!(
self.log,
"Delaying processing of duplicate RPC block";
@@ -171,7 +159,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let result = self
.chain
.process_block(block_root, block, NotifyExecutionLayer::Yes)
.process_block(block_root, block, NotifyExecutionLayer::Yes, || Ok(()))
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);

View File

@@ -301,6 +301,12 @@ lazy_static! {
"Gossipsub light_client_optimistic_update errors per error type",
&["type"]
);
pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"beacon_processor_send_error_per_work_type",
"Total number of beacon processor send error per work type",
&["type"]
);
/*

View File

@@ -6,7 +6,8 @@
#![allow(clippy::unit_arg)]
use crate::beacon_processor::{
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
BeaconProcessor, BeaconProcessorSend, InvalidBlockStorage, WorkEvent as BeaconWorkEvent,
MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::error;
use crate::service::{NetworkMessage, RequestId};
@@ -14,11 +15,14 @@ use crate::status::status_message;
use crate::sync::manager::RequestId as SyncId;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::{future, StreamExt};
use lighthouse_network::{rpc::*, PubsubMessage};
use lighthouse_network::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response};
use slog::{debug, error, o, trace, warn};
use futures::prelude::*;
use lighthouse_network::rpc::*;
use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use logging::TimeLatch;
use slog::{debug, o, trace};
use slog::{error, warn};
use std::cmp;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -37,9 +41,11 @@ pub struct Router<T: BeaconChainTypes> {
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
/// The `Router` logger.
log: slog::Logger,
/// Provides de-bounce functionality for logging.
logger_debounce: TimeLatch,
}
/// Types of messages the router can receive.
@@ -99,7 +105,7 @@ impl<T: BeaconChainTypes> Router<T> {
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
beacon_processor_send.clone(),
BeaconProcessorSend(beacon_processor_send.clone()),
sync_logger,
);
@@ -123,8 +129,9 @@ impl<T: BeaconChainTypes> Router<T> {
chain: beacon_chain,
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send,
beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
log: message_handler_log,
logger_debounce: TimeLatch::default(),
};
// spawn handler task and move the message handler instance into the spawned thread
@@ -570,12 +577,15 @@ impl<T: BeaconChainTypes> Router<T> {
self.beacon_processor_send
.try_send(work)
.unwrap_or_else(|e| {
let work_type = match &e {
let work_type = match &*e {
mpsc::error::TrySendError::Closed(work)
| mpsc::error::TrySendError::Full(work) => work.work_type(),
};
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
if self.logger_debounce.elapsed() {
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
}
})
}
}

View File

@@ -1,6 +1,7 @@
#![cfg(feature = "spec-minimal")]
use std::sync::Arc;
use crate::beacon_processor::BeaconProcessorSend;
use crate::service::RequestId;
use crate::sync::manager::RequestId as SyncId;
use crate::NetworkMessage;
@@ -80,7 +81,7 @@ impl TestRig {
SyncNetworkContext::new(
network_tx,
globals,
beacon_processor_tx,
BeaconProcessorSend(beacon_processor_tx),
chain,
log.new(slog::o!("component" => "network_context")),
)

View File

@@ -38,7 +38,7 @@ use super::block_lookups::{BlockLookups, PeerShouldHave};
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::beacon_processor::{BeaconProcessorSend, ChainSegmentProcessId};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::delayed_lookup;
@@ -238,7 +238,7 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
log: slog::Logger,
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
assert!(

View File

@@ -4,7 +4,7 @@
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::beacon_processor::WorkEvent;
use crate::beacon_processor::BeaconProcessorSend;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId};
@@ -60,7 +60,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
execution_engine_state: EngineState,
/// Channel to send work to the beacon processor.
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
pub chain: Arc<BeaconChain<T>>,
@@ -90,7 +90,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
chain: Arc<BeaconChain<T>>,
log: slog::Logger,
) -> Self {
@@ -564,12 +564,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender<WorkEvent<T>>> {
pub fn processor_channel_if_enabled(&self) -> Option<&BeaconProcessorSend<T>> {
self.is_execution_engine_online()
.then_some(&self.beacon_processor_send)
}
pub fn processor_channel(&self) -> &mpsc::Sender<WorkEvent<T>> {
pub fn processor_channel(&self) -> &BeaconProcessorSend<T> {
&self.beacon_processor_send
}

View File

@@ -381,7 +381,7 @@ where
mod tests {
use super::*;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::beacon_processor::{BeaconProcessorSend, WorkEvent as BeaconWorkEvent};
use crate::service::RequestId;
use crate::NetworkMessage;
use beacon_chain::{
@@ -610,7 +610,7 @@ mod tests {
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),
beacon_processor_tx,
BeaconProcessorSend(beacon_processor_tx),
chain,
log.new(o!("component" => "network_context")),
);