mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
1721 lines
78 KiB
Rust
1721 lines
78 KiB
Rust
//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network
|
|
//! that need to be processed by the `BeaconChain`.
|
|
//!
|
|
//! Uses `tokio` tasks (instead of raw threads) to provide the following tasks:
|
|
//!
|
|
//! - A "manager" task, which either spawns worker tasks or enqueues work.
|
|
//! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`.
|
|
//! - A task managing the scheduling of work that needs to be re-processed.
|
|
//!
|
|
//! ## Purpose
|
|
//!
|
|
//! The purpose of the `BeaconProcessor` is to provide two things:
|
|
//!
|
|
//! 1. Moving long-running, blocking tasks off the main `tokio` executor.
|
|
//! 2. A fixed-length buffer for consensus messages.
|
|
//!
|
|
//! (1) ensures that we don't clog up the networking stack with long-running tasks, potentially
|
|
//! causing timeouts. (2) means that we can easily and explicitly reject messages when we're
|
|
//! overloaded and also distribute load across time.
|
|
//!
|
|
//! ## Detail
|
|
//!
|
|
//! There is a single "manager" thread who listens to three event channels. These events are
|
|
//! either:
|
|
//!
|
|
//! - A new parcel of work (work event).
|
|
//! - Indication that a worker has finished a parcel of work (worker idle).
|
|
//! - A work ready for reprocessing (work event).
|
|
//!
|
|
//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count.
|
|
//!
|
|
//! Whenever the manager receives a new parcel of work, it is either:
|
|
//!
|
|
//! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers).
|
|
//! - Added to a queue.
|
|
//!
|
|
//! Whenever the manager receives a notification that a worker has finished a parcel of work, it
|
|
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
|
|
//! task.
|
|
|
|
use crate::work_reprocessing_queue::{
|
|
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
|
|
};
|
|
use futures::stream::{Stream, StreamExt};
|
|
use futures::task::Poll;
|
|
use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
|
|
use logging::TimeLatch;
|
|
use logging::crit;
|
|
use parking_lot::Mutex;
|
|
pub use scheduler::work_reprocessing_queue;
|
|
use serde::{Deserialize, Serialize};
|
|
use slot_clock::SlotClock;
|
|
use std::cmp;
|
|
use std::collections::{HashSet, VecDeque};
|
|
use std::fmt;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::task::Context;
|
|
use std::time::{Duration, Instant};
|
|
use strum::IntoStaticStr;
|
|
use task_executor::TaskExecutor;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc::error::TrySendError;
|
|
use tracing::{debug, error, trace, warn};
|
|
use types::{
|
|
BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
|
|
SingleAttestation, Slot, SubnetId,
|
|
};
|
|
use work_reprocessing_queue::IgnoredRpcBlock;
|
|
use work_reprocessing_queue::{
|
|
QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
|
|
spawn_reprocess_scheduler,
|
|
};
|
|
|
|
mod metrics;
|
|
pub mod scheduler;
|
|
|
|
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
|
///
|
|
/// Setting this too low will cause consensus messages to be dropped.
|
|
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum size of the channel for idle events to the `BeaconProcessor`.
|
|
///
|
|
/// Setting this too low will prevent new workers from being spawned. It *should* only need to be
|
|
/// set to the CPU count, but we set it high to be safe.
|
|
const MAX_IDLE_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum size of the channel for re-processing work events.
|
|
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;
|
|
|
|
/// Over-provision queues based on active validator count by some factor. The beacon chain has
|
|
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
|
|
/// slightly, we don't need to adjust the queues during the lifetime of a process.
|
|
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;
|
|
|
|
/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues
|
|
/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that
|
|
/// seems reasonable.
|
|
const MIN_QUEUE_LEN: usize = 128;
|
|
|
|
/// Maximum number of queued items that will be stored before dropping them
|
|
pub struct BeaconProcessorQueueLengths {
|
|
aggregate_queue: usize,
|
|
attestation_queue: usize,
|
|
unknown_block_aggregate_queue: usize,
|
|
unknown_block_attestation_queue: usize,
|
|
sync_message_queue: usize,
|
|
sync_contribution_queue: usize,
|
|
gossip_voluntary_exit_queue: usize,
|
|
gossip_proposer_slashing_queue: usize,
|
|
gossip_attester_slashing_queue: usize,
|
|
unknown_light_client_update_queue: usize,
|
|
rpc_block_queue: usize,
|
|
rpc_blob_queue: usize,
|
|
rpc_custody_column_queue: usize,
|
|
column_reconstruction_queue: usize,
|
|
chain_segment_queue: usize,
|
|
backfill_chain_segment: usize,
|
|
gossip_block_queue: usize,
|
|
gossip_blob_queue: usize,
|
|
gossip_data_column_queue: usize,
|
|
delayed_block_queue: usize,
|
|
status_queue: usize,
|
|
bbrange_queue: usize,
|
|
bbroots_queue: usize,
|
|
blbroots_queue: usize,
|
|
blbrange_queue: usize,
|
|
dcbroots_queue: usize,
|
|
dcbrange_queue: usize,
|
|
gossip_bls_to_execution_change_queue: usize,
|
|
lc_gossip_finality_update_queue: usize,
|
|
lc_gossip_optimistic_update_queue: usize,
|
|
lc_bootstrap_queue: usize,
|
|
lc_rpc_optimistic_update_queue: usize,
|
|
lc_rpc_finality_update_queue: usize,
|
|
lc_update_range_queue: usize,
|
|
api_request_p0_queue: usize,
|
|
api_request_p1_queue: usize,
|
|
}
|
|
|
|
impl BeaconProcessorQueueLengths {
|
|
pub fn from_state<E: EthSpec>(
|
|
state: &BeaconState<E>,
|
|
spec: &ChainSpec,
|
|
) -> Result<Self, String> {
|
|
let active_validator_count =
|
|
match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
|
|
Ok(indices) => indices.len(),
|
|
Err(_) => state
|
|
.get_active_validator_indices(state.current_epoch(), spec)
|
|
.map_err(|e| format!("Error computing active indices: {:?}", e))?
|
|
.len(),
|
|
};
|
|
let active_validator_count =
|
|
(ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
|
|
let slots_per_epoch = E::slots_per_epoch() as usize;
|
|
|
|
Ok(Self {
|
|
aggregate_queue: 4096,
|
|
unknown_block_aggregate_queue: 1024,
|
|
// Capacity for a full slot's worth of attestations if subscribed to all subnets
|
|
attestation_queue: std::cmp::max(
|
|
active_validator_count / slots_per_epoch,
|
|
MIN_QUEUE_LEN,
|
|
),
|
|
// Capacity for a full slot's worth of attestations if subscribed to all subnets
|
|
unknown_block_attestation_queue: std::cmp::max(
|
|
active_validator_count / slots_per_epoch,
|
|
MIN_QUEUE_LEN,
|
|
),
|
|
sync_message_queue: 2048,
|
|
sync_contribution_queue: 1024,
|
|
gossip_voluntary_exit_queue: 4096,
|
|
gossip_proposer_slashing_queue: 4096,
|
|
gossip_attester_slashing_queue: 4096,
|
|
unknown_light_client_update_queue: 128,
|
|
rpc_block_queue: 1024,
|
|
rpc_blob_queue: 1024,
|
|
// We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit
|
|
// this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB.
|
|
rpc_custody_column_queue: 64,
|
|
column_reconstruction_queue: 1,
|
|
chain_segment_queue: 64,
|
|
backfill_chain_segment: 64,
|
|
gossip_block_queue: 1024,
|
|
gossip_blob_queue: 1024,
|
|
gossip_data_column_queue: 1024,
|
|
delayed_block_queue: 1024,
|
|
status_queue: 1024,
|
|
bbrange_queue: 1024,
|
|
bbroots_queue: 1024,
|
|
blbroots_queue: 1024,
|
|
blbrange_queue: 1024,
|
|
dcbroots_queue: 1024,
|
|
dcbrange_queue: 1024,
|
|
gossip_bls_to_execution_change_queue: 16384,
|
|
lc_gossip_finality_update_queue: 1024,
|
|
lc_gossip_optimistic_update_queue: 1024,
|
|
lc_bootstrap_queue: 1024,
|
|
lc_rpc_optimistic_update_queue: 512,
|
|
lc_rpc_finality_update_queue: 512,
|
|
lc_update_range_queue: 512,
|
|
api_request_p0_queue: 1024,
|
|
api_request_p1_queue: 1024,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// The name of the manager tokio task.
|
|
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
|
|
|
|
/// The name of the worker tokio tasks.
|
|
const WORKER_TASK_NAME: &str = "beacon_processor_worker";
|
|
|
|
/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
|
|
/// batch.
|
|
///
|
|
/// Choosing these values is difficult since there is a trade-off between:
|
|
///
|
|
/// - It is faster to verify one large batch than multiple smaller batches.
|
|
/// - "Poisoning" attacks have a larger impact as the batch size increases.
|
|
///
|
|
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
|
|
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
|
|
/// individually verifying each attestation signature.
|
|
const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
|
|
const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
|
|
|
|
/// Unique IDs used for metrics and testing.
|
|
pub const WORKER_FREED: &str = "worker_freed";
|
|
pub const NOTHING_TO_DO: &str = "nothing_to_do";
|
|
|
|
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
|
|
pub struct BeaconProcessorConfig {
|
|
pub max_workers: usize,
|
|
pub max_work_event_queue_len: usize,
|
|
pub max_scheduled_work_queue_len: usize,
|
|
pub max_gossip_attestation_batch_size: usize,
|
|
pub max_gossip_aggregate_batch_size: usize,
|
|
pub enable_backfill_rate_limiting: bool,
|
|
}
|
|
|
|
impl Default for BeaconProcessorConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
max_workers: cmp::max(1, num_cpus::get()),
|
|
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
|
|
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
|
|
max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
|
|
max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
|
|
enable_backfill_rate_limiting: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
// The channels necessary to instantiate a `BeaconProcessor`.
|
|
pub struct BeaconProcessorChannels<E: EthSpec> {
|
|
pub beacon_processor_tx: BeaconProcessorSend<E>,
|
|
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
|
|
}
|
|
|
|
impl<E: EthSpec> BeaconProcessorChannels<E> {
|
|
pub fn new(config: &BeaconProcessorConfig) -> Self {
|
|
let (beacon_processor_tx, beacon_processor_rx) =
|
|
mpsc::channel(config.max_work_event_queue_len);
|
|
|
|
Self {
|
|
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
|
|
beacon_processor_rx,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
|
|
fn default() -> Self {
|
|
Self::new(&BeaconProcessorConfig::default())
|
|
}
|
|
}
|
|
|
|
/// A simple first-in-first-out queue with a maximum length.
|
|
struct FifoQueue<T> {
|
|
queue: VecDeque<T>,
|
|
max_length: usize,
|
|
}
|
|
|
|
impl<T> FifoQueue<T> {
|
|
/// Create a new, empty queue with the given length.
|
|
pub fn new(max_length: usize) -> Self {
|
|
Self {
|
|
queue: VecDeque::default(),
|
|
max_length,
|
|
}
|
|
}
|
|
|
|
/// Add a new item to the queue.
|
|
///
|
|
/// Drops `item` if the queue is full.
|
|
pub fn push(&mut self, item: T, item_desc: &str) {
|
|
if self.queue.len() == self.max_length {
|
|
error!(
|
|
msg = "the system has insufficient resources for load",
|
|
queue_len = self.max_length,
|
|
queue = item_desc,
|
|
"Work queue is full"
|
|
)
|
|
} else {
|
|
self.queue.push_back(item);
|
|
}
|
|
}
|
|
|
|
/// Remove the next item from the queue.
|
|
pub fn pop(&mut self) -> Option<T> {
|
|
self.queue.pop_front()
|
|
}
|
|
|
|
/// Returns the current length of the queue.
|
|
pub fn len(&self) -> usize {
|
|
self.queue.len()
|
|
}
|
|
}
|
|
|
|
/// A simple last-in-first-out queue with a maximum length.
|
|
struct LifoQueue<T> {
|
|
queue: VecDeque<T>,
|
|
max_length: usize,
|
|
}
|
|
|
|
impl<T> LifoQueue<T> {
|
|
/// Create a new, empty queue with the given length.
|
|
pub fn new(max_length: usize) -> Self {
|
|
Self {
|
|
queue: VecDeque::default(),
|
|
max_length,
|
|
}
|
|
}
|
|
|
|
/// Add a new item to the front of the queue.
|
|
///
|
|
/// If the queue is full, the item at the back of the queue is dropped.
|
|
pub fn push(&mut self, item: T) {
|
|
if self.queue.len() == self.max_length {
|
|
self.queue.pop_back();
|
|
}
|
|
self.queue.push_front(item);
|
|
}
|
|
|
|
/// Remove the next item from the queue.
|
|
pub fn pop(&mut self) -> Option<T> {
|
|
self.queue.pop_front()
|
|
}
|
|
|
|
/// Returns `true` if the queue is full.
|
|
pub fn is_full(&self) -> bool {
|
|
self.queue.len() >= self.max_length
|
|
}
|
|
|
|
/// Returns the current length of the queue.
|
|
pub fn len(&self) -> usize {
|
|
self.queue.len()
|
|
}
|
|
}
|
|
|
|
/// A handle that sends a message on the provided channel to a receiver when it gets dropped.
|
|
///
|
|
/// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache`
|
|
/// and perform any other necessary cleanup.
|
|
pub struct DuplicateCacheHandle {
|
|
entry: Hash256,
|
|
cache: DuplicateCache,
|
|
}
|
|
|
|
impl Drop for DuplicateCacheHandle {
|
|
fn drop(&mut self) {
|
|
self.cache.remove(&self.entry);
|
|
}
|
|
}
|
|
|
|
/// A simple cache for detecting duplicate block roots across multiple threads.
|
|
#[derive(Clone, Default)]
|
|
pub struct DuplicateCache {
|
|
inner: Arc<Mutex<HashSet<Hash256>>>,
|
|
}
|
|
|
|
impl DuplicateCache {
|
|
/// Checks if the given block_root exists and inserts it into the cache if
|
|
/// it doesn't exist.
|
|
///
|
|
/// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully
|
|
/// inserted and `None` if the block root already existed in the cache.
|
|
///
|
|
/// The handle removes the entry from the cache when it is dropped. This ensures that any unclean
|
|
/// shutdowns in the worker tasks does not leave inconsistent state in the cache.
|
|
pub fn check_and_insert(&self, block_root: Hash256) -> Option<DuplicateCacheHandle> {
|
|
let mut inner = self.inner.lock();
|
|
if inner.insert(block_root) {
|
|
Some(DuplicateCacheHandle {
|
|
entry: block_root,
|
|
cache: self.clone(),
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Remove the given block_root from the cache.
|
|
pub fn remove(&self, block_root: &Hash256) {
|
|
let mut inner = self.inner.lock();
|
|
inner.remove(block_root);
|
|
}
|
|
}
|
|
|
|
/// An event to be processed by the manager task.
|
|
#[derive(Debug)]
|
|
pub struct WorkEvent<E: EthSpec> {
|
|
pub drop_during_sync: bool,
|
|
pub work: Work<E>,
|
|
}
|
|
|
|
impl<E: EthSpec> WorkEvent<E> {
|
|
/// Get a representation of the type of work this `WorkEvent` contains.
|
|
pub fn work_type(&self) -> WorkType {
|
|
self.work.to_type()
|
|
}
|
|
|
|
/// Get a `str` representation of the type of work this `WorkEvent` contains.
|
|
pub fn work_type_str(&self) -> &'static str {
|
|
self.work_type().into()
|
|
}
|
|
}
|
|
|
|
impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
|
|
fn from(ready_work: ReadyWork) -> Self {
|
|
match ready_work {
|
|
ReadyWork::Block(QueuedGossipBlock {
|
|
beacon_block_slot,
|
|
beacon_block_root,
|
|
process_fn,
|
|
}) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::DelayedImportBlock {
|
|
beacon_block_slot,
|
|
beacon_block_root,
|
|
process_fn,
|
|
},
|
|
},
|
|
ReadyWork::RpcBlock(QueuedRpcBlock {
|
|
beacon_block_root: _,
|
|
process_fn,
|
|
ignore_fn: _,
|
|
}) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::RpcBlock { process_fn },
|
|
},
|
|
ReadyWork::IgnoredRpcBlock(IgnoredRpcBlock { process_fn }) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::IgnoredRpcBlock { process_fn },
|
|
},
|
|
ReadyWork::Unaggregate(QueuedUnaggregate {
|
|
beacon_block_root: _,
|
|
process_fn,
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownBlockAttestation { process_fn },
|
|
},
|
|
ReadyWork::Aggregate(QueuedAggregate {
|
|
process_fn,
|
|
beacon_block_root: _,
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownBlockAggregate { process_fn },
|
|
},
|
|
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
|
|
parent_root,
|
|
process_fn,
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownLightClientOptimisticUpdate {
|
|
parent_root,
|
|
process_fn,
|
|
},
|
|
},
|
|
ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::ChainSegmentBackfill(process_fn),
|
|
},
|
|
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::ColumnReconstruction(process_fn),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Items required to verify a batch of unaggregated gossip attestations.
|
|
#[derive(Debug)]
|
|
pub struct GossipAttestationPackage<T> {
|
|
pub message_id: MessageId,
|
|
pub peer_id: PeerId,
|
|
pub attestation: Box<T>,
|
|
pub subnet_id: SubnetId,
|
|
pub should_import: bool,
|
|
pub seen_timestamp: Duration,
|
|
}
|
|
|
|
/// Items required to verify a batch of aggregated gossip attestations.
|
|
#[derive(Debug)]
|
|
pub struct GossipAggregatePackage<E: EthSpec> {
|
|
pub message_id: MessageId,
|
|
pub peer_id: PeerId,
|
|
pub aggregate: Box<SignedAggregateAndProof<E>>,
|
|
pub beacon_block_root: Hash256,
|
|
pub seen_timestamp: Duration,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct BeaconProcessorSend<E: EthSpec>(pub mpsc::Sender<WorkEvent<E>>);
|
|
|
|
impl<E: EthSpec> BeaconProcessorSend<E> {
|
|
pub fn try_send(&self, message: WorkEvent<E>) -> Result<(), TrySendError<WorkEvent<E>>> {
|
|
let work_type = message.work_type();
|
|
match self.0.try_send(message) {
|
|
Ok(res) => Ok(res),
|
|
Err(e) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
|
|
&[work_type.into()],
|
|
);
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
|
|
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
|
|
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
|
|
pub enum BlockingOrAsync {
|
|
Blocking(BlockingFn),
|
|
Async(AsyncFn),
|
|
}
|
|
pub type GossipAttestationBatch = Vec<GossipAttestationPackage<SingleAttestation>>;
|
|
|
|
/// Indicates the type of work to be performed and therefore its priority and
|
|
/// queuing specifics.
|
|
pub enum Work<E: EthSpec> {
|
|
GossipAttestation {
|
|
attestation: Box<GossipAttestationPackage<SingleAttestation>>,
|
|
process_individual:
|
|
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
|
|
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
|
|
},
|
|
UnknownBlockAttestation {
|
|
process_fn: BlockingFn,
|
|
},
|
|
GossipAttestationBatch {
|
|
attestations: GossipAttestationBatch,
|
|
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
|
|
},
|
|
GossipAggregate {
|
|
aggregate: Box<GossipAggregatePackage<E>>,
|
|
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
|
|
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
|
|
},
|
|
UnknownBlockAggregate {
|
|
process_fn: BlockingFn,
|
|
},
|
|
UnknownLightClientOptimisticUpdate {
|
|
parent_root: Hash256,
|
|
process_fn: BlockingFn,
|
|
},
|
|
GossipAggregateBatch {
|
|
aggregates: Vec<GossipAggregatePackage<E>>,
|
|
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
|
|
},
|
|
GossipBlock(AsyncFn),
|
|
GossipBlobSidecar(AsyncFn),
|
|
GossipDataColumnSidecar(AsyncFn),
|
|
DelayedImportBlock {
|
|
beacon_block_slot: Slot,
|
|
beacon_block_root: Hash256,
|
|
process_fn: AsyncFn,
|
|
},
|
|
GossipVoluntaryExit(BlockingFn),
|
|
GossipProposerSlashing(BlockingFn),
|
|
GossipAttesterSlashing(BlockingFn),
|
|
GossipSyncSignature(BlockingFn),
|
|
GossipSyncContribution(BlockingFn),
|
|
GossipLightClientFinalityUpdate(BlockingFn),
|
|
GossipLightClientOptimisticUpdate(BlockingFn),
|
|
RpcBlock {
|
|
process_fn: AsyncFn,
|
|
},
|
|
RpcBlobs {
|
|
process_fn: AsyncFn,
|
|
},
|
|
RpcCustodyColumn(AsyncFn),
|
|
ColumnReconstruction(AsyncFn),
|
|
IgnoredRpcBlock {
|
|
process_fn: BlockingFn,
|
|
},
|
|
ChainSegment(AsyncFn),
|
|
ChainSegmentBackfill(AsyncFn),
|
|
Status(BlockingFn),
|
|
BlocksByRangeRequest(AsyncFn),
|
|
BlocksByRootsRequest(AsyncFn),
|
|
BlobsByRangeRequest(BlockingFn),
|
|
BlobsByRootsRequest(BlockingFn),
|
|
DataColumnsByRootsRequest(BlockingFn),
|
|
DataColumnsByRangeRequest(BlockingFn),
|
|
GossipBlsToExecutionChange(BlockingFn),
|
|
LightClientBootstrapRequest(BlockingFn),
|
|
LightClientOptimisticUpdateRequest(BlockingFn),
|
|
LightClientFinalityUpdateRequest(BlockingFn),
|
|
LightClientUpdatesByRangeRequest(BlockingFn),
|
|
ApiRequestP0(BlockingOrAsync),
|
|
ApiRequestP1(BlockingOrAsync),
|
|
Reprocess(ReprocessQueueMessage),
|
|
}
|
|
|
|
impl<E: EthSpec> fmt::Debug for Work<E> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
write!(f, "{}", Into::<&'static str>::into(self.to_type()))
|
|
}
|
|
}
|
|
|
|
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
|
|
#[strum(serialize_all = "snake_case")]
|
|
pub enum WorkType {
|
|
GossipAttestation,
|
|
GossipAttestationToConvert,
|
|
UnknownBlockAttestation,
|
|
GossipAttestationBatch,
|
|
GossipAggregate,
|
|
UnknownBlockAggregate,
|
|
UnknownLightClientOptimisticUpdate,
|
|
GossipAggregateBatch,
|
|
GossipBlock,
|
|
GossipBlobSidecar,
|
|
GossipDataColumnSidecar,
|
|
DelayedImportBlock,
|
|
GossipVoluntaryExit,
|
|
GossipProposerSlashing,
|
|
GossipAttesterSlashing,
|
|
GossipSyncSignature,
|
|
GossipSyncContribution,
|
|
GossipLightClientFinalityUpdate,
|
|
GossipLightClientOptimisticUpdate,
|
|
RpcBlock,
|
|
RpcBlobs,
|
|
RpcCustodyColumn,
|
|
ColumnReconstruction,
|
|
IgnoredRpcBlock,
|
|
ChainSegment,
|
|
ChainSegmentBackfill,
|
|
Status,
|
|
BlocksByRangeRequest,
|
|
BlocksByRootsRequest,
|
|
BlobsByRangeRequest,
|
|
BlobsByRootsRequest,
|
|
DataColumnsByRootsRequest,
|
|
DataColumnsByRangeRequest,
|
|
GossipBlsToExecutionChange,
|
|
LightClientBootstrapRequest,
|
|
LightClientOptimisticUpdateRequest,
|
|
LightClientFinalityUpdateRequest,
|
|
LightClientUpdatesByRangeRequest,
|
|
ApiRequestP0,
|
|
ApiRequestP1,
|
|
Reprocess,
|
|
}
|
|
|
|
impl<E: EthSpec> Work<E> {
|
|
fn str_id(&self) -> &'static str {
|
|
self.to_type().into()
|
|
}
|
|
|
|
/// Provides a `&str` that uniquely identifies each enum variant.
|
|
fn to_type(&self) -> WorkType {
|
|
match self {
|
|
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
|
|
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
|
|
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
|
|
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
|
|
Work::GossipBlock(_) => WorkType::GossipBlock,
|
|
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
|
|
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
|
|
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
|
|
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
|
|
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
|
|
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
|
|
Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature,
|
|
Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution,
|
|
Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate,
|
|
Work::GossipLightClientOptimisticUpdate(_) => {
|
|
WorkType::GossipLightClientOptimisticUpdate
|
|
}
|
|
Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange,
|
|
Work::RpcBlock { .. } => WorkType::RpcBlock,
|
|
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
|
|
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
|
|
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
|
|
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
|
|
Work::ChainSegment { .. } => WorkType::ChainSegment,
|
|
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
|
|
Work::Status(_) => WorkType::Status,
|
|
Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest,
|
|
Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest,
|
|
Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
|
|
Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest,
|
|
Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest,
|
|
Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest,
|
|
Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest,
|
|
Work::LightClientOptimisticUpdateRequest(_) => {
|
|
WorkType::LightClientOptimisticUpdateRequest
|
|
}
|
|
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
|
|
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
|
|
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
|
|
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
|
|
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
|
WorkType::UnknownLightClientOptimisticUpdate
|
|
}
|
|
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
|
|
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
|
|
Work::Reprocess { .. } => WorkType::Reprocess,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unifies all the messages processed by the `BeaconProcessor`.
|
|
enum InboundEvent<E: EthSpec> {
|
|
/// A worker has completed a task and is free.
|
|
WorkerIdle,
|
|
/// There is new work to be done.
|
|
WorkEvent((WorkEvent<E>, Instant)),
|
|
/// A work event that was queued for re-processing has become ready.
|
|
ReprocessingWork((WorkEvent<E>, Instant)),
|
|
}
|
|
|
|
/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
|
|
///
|
|
/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained
|
|
/// control (specifically in the ordering of event processing).
|
|
struct InboundEvents<E: EthSpec> {
|
|
/// Used by workers when they finish a task.
|
|
idle_rx: mpsc::Receiver<WorkType>,
|
|
/// Used by upstream processes to send new work to the `BeaconProcessor`.
|
|
event_rx: mpsc::Receiver<WorkEvent<E>>,
|
|
/// Used internally for queuing work ready to be re-processed.
|
|
ready_work_rx: mpsc::Receiver<ReadyWork>,
|
|
}
|
|
|
|
impl<E: EthSpec> Stream for InboundEvents<E> {
|
|
type Item = InboundEvent<E>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
// Always check for idle workers before anything else. This allows us to ensure that a big
|
|
// stream of new events doesn't suppress the processing of existing events.
|
|
match self.idle_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(_)) => {
|
|
return Poll::Ready(Some(InboundEvent::WorkerIdle));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
// Poll for delayed blocks before polling for new work. It might be the case that a delayed
|
|
// block is required to successfully process some new work.
|
|
match self.ready_work_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(ready_work)) => {
|
|
return Poll::Ready(Some(InboundEvent::ReprocessingWork((
|
|
ready_work.into(),
|
|
Instant::now(),
|
|
))));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
match self.event_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(event)) => {
|
|
return Poll::Ready(Some(InboundEvent::WorkEvent((event, Instant::now()))));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// A mutli-threaded processor for messages received on the network
|
|
/// that need to be processed by the `BeaconChain`
|
|
///
|
|
/// See module level documentation for more information.
|
|
pub struct BeaconProcessor<E: EthSpec> {
|
|
pub network_globals: Arc<NetworkGlobals<E>>,
|
|
pub executor: TaskExecutor,
|
|
pub current_workers: usize,
|
|
pub config: BeaconProcessorConfig,
|
|
}
|
|
|
|
impl<E: EthSpec> BeaconProcessor<E> {
|
|
/// Spawns the "manager" task which checks the receiver end of the returned `Sender` for
|
|
/// messages which contain some new work which will be:
|
|
///
|
|
/// - Performed immediately, if a worker is available.
|
|
/// - Queued for later processing, if no worker is currently available.
|
|
///
|
|
/// Only `self.config.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
|
|
/// started with `spawn_blocking`.
|
|
///
|
|
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
|
|
/// events processed by `self`. This should only be used during testing.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn spawn_manager<S: SlotClock + 'static>(
|
|
mut self,
|
|
event_rx: mpsc::Receiver<WorkEvent<E>>,
|
|
work_journal_tx: Option<mpsc::Sender<&'static str>>,
|
|
slot_clock: S,
|
|
maximum_gossip_clock_disparity: Duration,
|
|
queue_lengths: BeaconProcessorQueueLengths,
|
|
) -> Result<(), String> {
|
|
// Used by workers to communicate that they are finished a task.
|
|
let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);
|
|
|
|
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
|
// attestations into blocks. Additionally, later attestations contain more information than
|
|
// earlier ones, so we consider them more valuable.
|
|
let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
|
|
let mut aggregate_debounce = TimeLatch::default();
|
|
let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
|
|
let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue);
|
|
let mut attestation_debounce = TimeLatch::default();
|
|
let mut unknown_block_aggregate_queue =
|
|
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
|
|
let mut unknown_block_attestation_queue =
|
|
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
|
|
|
|
let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
|
|
let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
|
|
|
|
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
|
|
// a strong feeling about queue type for exits.
|
|
let mut gossip_voluntary_exit_queue =
|
|
FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue);
|
|
|
|
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
|
|
// queues with lots of junk messages.
|
|
let mut gossip_proposer_slashing_queue =
|
|
FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue);
|
|
let mut gossip_attester_slashing_queue =
|
|
FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);
|
|
|
|
// Using a FIFO queue since blocks need to be imported sequentially.
|
|
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
|
|
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
|
|
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
|
|
let mut column_reconstruction_queue =
|
|
LifoQueue::new(queue_lengths.column_reconstruction_queue);
|
|
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
|
|
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
|
|
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
|
|
let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
|
|
let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
|
|
let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
|
|
|
|
let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
|
|
let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
|
|
let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
|
|
let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
|
|
let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);
|
|
let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
|
|
let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);
|
|
|
|
let mut gossip_bls_to_execution_change_queue =
|
|
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);
|
|
|
|
// Using FIFO queues for light client updates to maintain sequence order.
|
|
let mut lc_gossip_finality_update_queue =
|
|
FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue);
|
|
let mut lc_gossip_optimistic_update_queue =
|
|
FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue);
|
|
let mut unknown_light_client_update_queue =
|
|
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
|
|
let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
|
|
let mut lc_rpc_optimistic_update_queue =
|
|
FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue);
|
|
let mut lc_rpc_finality_update_queue =
|
|
FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue);
|
|
let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue);
|
|
|
|
let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
|
|
let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue);
|
|
|
|
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
|
// receive them back once they are ready (`ready_work_rx`).
|
|
let (ready_work_tx, ready_work_rx) =
|
|
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
|
|
|
|
let (reprocess_work_tx, reprocess_work_rx) =
|
|
mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);
|
|
|
|
spawn_reprocess_scheduler(
|
|
ready_work_tx,
|
|
reprocess_work_rx,
|
|
&self.executor,
|
|
Arc::new(slot_clock),
|
|
maximum_gossip_clock_disparity,
|
|
)?;
|
|
|
|
let executor = self.executor.clone();
|
|
|
|
// The manager future will run on the core executor and delegate tasks to worker
|
|
// threads on the blocking executor.
|
|
let manager_future = async move {
|
|
let mut inbound_events = InboundEvents {
|
|
idle_rx,
|
|
event_rx,
|
|
ready_work_rx,
|
|
};
|
|
|
|
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
|
|
|
|
loop {
|
|
let (work_event, created_timestamp) = match inbound_events.next().await {
|
|
Some(InboundEvent::WorkerIdle) => {
|
|
self.current_workers = self.current_workers.saturating_sub(1);
|
|
(None, Instant::now())
|
|
}
|
|
Some(InboundEvent::WorkEvent((event, created_timestamp)))
|
|
if enable_backfill_rate_limiting =>
|
|
{
|
|
match QueuedBackfillBatch::try_from(event) {
|
|
Ok(backfill_batch) => {
|
|
match reprocess_work_tx
|
|
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
|
|
{
|
|
Err(e) => {
|
|
warn!(
|
|
error = %e,
|
|
"Unable to queue backfill work event. Will try to process now."
|
|
);
|
|
match e {
|
|
TrySendError::Full(reprocess_queue_message)
|
|
| TrySendError::Closed(reprocess_queue_message) => {
|
|
match reprocess_queue_message {
|
|
ReprocessQueueMessage::BackfillSync(
|
|
backfill_batch,
|
|
) => (
|
|
Some(backfill_batch.into()),
|
|
created_timestamp,
|
|
),
|
|
other => {
|
|
crit!(
|
|
message_type = other.as_ref(),
|
|
"Unexpected queue message type"
|
|
);
|
|
// This is an unhandled exception, drop the message.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(..) => {
|
|
// backfill work sent to "reprocessing" queue. Process the next event.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
Err(event) => (Some(event), created_timestamp),
|
|
}
|
|
}
|
|
Some(InboundEvent::WorkEvent((event, created_timestamp)))
|
|
| Some(InboundEvent::ReprocessingWork((event, created_timestamp))) => {
|
|
(Some(event), created_timestamp)
|
|
}
|
|
None => {
|
|
debug!(msg = "stream ended", "Gossip processor stopped");
|
|
break;
|
|
}
|
|
};
|
|
|
|
let _event_timer =
|
|
metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS);
|
|
if let Some(event) = &work_event {
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT,
|
|
&[event.work.str_id()],
|
|
);
|
|
} else {
|
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL);
|
|
}
|
|
|
|
if let Some(work_journal_tx) = &work_journal_tx {
|
|
let id = work_event
|
|
.as_ref()
|
|
.map(|event| event.work.str_id())
|
|
.unwrap_or(WORKER_FREED);
|
|
|
|
// We don't care if this message was successfully sent, we only use the journal
|
|
// during testing. We also ignore reprocess messages to ensure our test cases can pass.
|
|
if id != "reprocess" {
|
|
let _ = work_journal_tx.try_send(id);
|
|
}
|
|
}
|
|
|
|
let can_spawn = self.current_workers < self.config.max_workers;
|
|
let drop_during_sync = work_event
|
|
.as_ref()
|
|
.is_some_and(|event| event.drop_during_sync);
|
|
|
|
let idle_tx = idle_tx.clone();
|
|
let modified_queue_id = match work_event {
|
|
// There is no new work event, but we are able to spawn a new worker.
|
|
//
|
|
// We don't check the `work.drop_during_sync` here. We assume that if it made
|
|
// it into the queue at any point then we should process it.
|
|
None if can_spawn => {
|
|
// Check for chain segments first, they're the most efficient way to get
|
|
// blocks into the system.
|
|
let work_event: Option<Work<E>> =
|
|
if let Some(item) = chain_segment_queue.pop() {
|
|
Some(item)
|
|
// Check sync blocks before gossip blocks, since we've already explicitly
|
|
// requested these blocks.
|
|
} else if let Some(item) = rpc_block_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = rpc_blob_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = rpc_custody_column_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = rpc_custody_column_queue.pop() {
|
|
Some(item)
|
|
// Check delayed blocks before gossip blocks, the gossip blocks might rely
|
|
// on the delayed ones.
|
|
} else if let Some(item) = delayed_block_queue.pop() {
|
|
Some(item)
|
|
// Check gossip blocks before gossip attestations, since a block might be
|
|
// required to verify some attestations.
|
|
} else if let Some(item) = gossip_block_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = gossip_blob_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = gossip_data_column_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = column_reconstruction_queue.pop() {
|
|
Some(item)
|
|
// Check the priority 0 API requests after blocks and blobs, but before attestations.
|
|
} else if let Some(item) = api_request_p0_queue.pop() {
|
|
Some(item)
|
|
// Check the aggregates, *then* the unaggregates since we assume that
|
|
// aggregates are more valuable to local validators and effectively give us
|
|
// more information with less signature verification time.
|
|
} else if aggregate_queue.len() > 0 {
|
|
let batch_size = cmp::min(
|
|
aggregate_queue.len(),
|
|
self.config.max_gossip_aggregate_batch_size,
|
|
);
|
|
|
|
if batch_size < 2 {
|
|
// One single aggregate is in the queue, process it individually.
|
|
aggregate_queue.pop()
|
|
} else {
|
|
// Collect two or more aggregates into a batch, so they can take
|
|
// advantage of batch signature verification.
|
|
//
|
|
// Note: this will convert the `Work::GossipAggregate` item into a
|
|
// `Work::GossipAggregateBatch` item.
|
|
let mut aggregates = Vec::with_capacity(batch_size);
|
|
let mut process_batch_opt = None;
|
|
for _ in 0..batch_size {
|
|
if let Some(item) = aggregate_queue.pop() {
|
|
match item {
|
|
Work::GossipAggregate {
|
|
aggregate,
|
|
process_individual: _,
|
|
process_batch,
|
|
} => {
|
|
aggregates.push(*aggregate);
|
|
if process_batch_opt.is_none() {
|
|
process_batch_opt = Some(process_batch);
|
|
}
|
|
}
|
|
_ => {
|
|
error!("Invalid item in aggregate queue");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(process_batch) = process_batch_opt {
|
|
// Process all aggregates with a single worker.
|
|
Some(Work::GossipAggregateBatch {
|
|
aggregates,
|
|
process_batch,
|
|
})
|
|
} else {
|
|
// There is no good reason for this to
|
|
// happen, it is a serious logic error.
|
|
// Since we only form batches when multiple
|
|
// work items exist, we should always have a
|
|
// work closure at this point.
|
|
crit!("Missing aggregate work");
|
|
None
|
|
}
|
|
}
|
|
// Check the unaggregated attestation queue.
|
|
//
|
|
// Potentially use batching.
|
|
} else if attestation_queue.len() > 0 {
|
|
let batch_size = cmp::min(
|
|
attestation_queue.len(),
|
|
self.config.max_gossip_attestation_batch_size,
|
|
);
|
|
|
|
if batch_size < 2 {
|
|
// One single attestation is in the queue, process it individually.
|
|
attestation_queue.pop()
|
|
} else {
|
|
// Collect two or more attestations into a batch, so they can take
|
|
// advantage of batch signature verification.
|
|
//
|
|
// Note: this will convert the `Work::GossipAttestation` item into a
|
|
// `Work::GossipAttestationBatch` item.
|
|
let mut attestations = Vec::with_capacity(batch_size);
|
|
let mut process_batch_opt = None;
|
|
for _ in 0..batch_size {
|
|
if let Some(item) = attestation_queue.pop() {
|
|
match item {
|
|
Work::GossipAttestation {
|
|
attestation,
|
|
process_individual: _,
|
|
process_batch,
|
|
} => {
|
|
attestations.push(*attestation);
|
|
if process_batch_opt.is_none() {
|
|
process_batch_opt = Some(process_batch);
|
|
}
|
|
}
|
|
_ => error!("Invalid item in attestation queue"),
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(process_batch) = process_batch_opt {
|
|
// Process all attestations with a single worker.
|
|
Some(Work::GossipAttestationBatch {
|
|
attestations,
|
|
process_batch,
|
|
})
|
|
} else {
|
|
// There is no good reason for this to
|
|
// happen, it is a serious logic error.
|
|
// Since we only form batches when multiple
|
|
// work items exist, we should always have a
|
|
// work closure at this point.
|
|
crit!("Missing attestations work");
|
|
None
|
|
}
|
|
}
|
|
// Convert any gossip attestations that need to be converted.
|
|
} else if let Some(item) = attestation_to_convert_queue.pop() {
|
|
Some(item)
|
|
// Check sync committee messages after attestations as their rewards are lesser
|
|
// and they don't influence fork choice.
|
|
} else if let Some(item) = sync_contribution_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = sync_message_queue.pop() {
|
|
Some(item)
|
|
// Aggregates and unaggregates queued for re-processing are older and we
|
|
// care about fresher ones, so check those first.
|
|
} else if let Some(item) = unknown_block_aggregate_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = unknown_block_attestation_queue.pop() {
|
|
Some(item)
|
|
// Check RPC methods next. Status messages are needed for sync so
|
|
// prioritize them over syncing requests from other peers (BlocksByRange
|
|
// and BlocksByRoot)
|
|
} else if let Some(item) = status_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = bbrange_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = bbroots_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = blbrange_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = blbroots_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = dcbroots_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = dcbrange_queue.pop() {
|
|
Some(item)
|
|
// Check slashings after all other consensus messages so we prioritize
|
|
// following head.
|
|
//
|
|
// Check attester slashings before proposer slashings since they have the
|
|
// potential to slash multiple validators at once.
|
|
} else if let Some(item) = gossip_attester_slashing_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
|
|
Some(item)
|
|
// Check exits and address changes late since our validators don't get
|
|
// rewards from them.
|
|
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
|
|
Some(item)
|
|
// Check the priority 1 API requests after we've
|
|
// processed all the interesting things from the network
|
|
// and things required for us to stay in good repute
|
|
// with our P2P peers.
|
|
} else if let Some(item) = api_request_p1_queue.pop() {
|
|
Some(item)
|
|
// Handle backfill sync chain segments.
|
|
} else if let Some(item) = backfill_chain_segment.pop() {
|
|
Some(item)
|
|
// Handle light client requests.
|
|
} else if let Some(item) = lc_gossip_finality_update_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = lc_gossip_optimistic_update_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = unknown_light_client_update_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = lc_bootstrap_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = lc_rpc_optimistic_update_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = lc_rpc_finality_update_queue.pop() {
|
|
Some(item)
|
|
} else if let Some(item) = lc_update_range_queue.pop() {
|
|
Some(item)
|
|
// This statement should always be the final else statement.
|
|
} else {
|
|
// Let the journal know that a worker is freed and there's nothing else
|
|
// for it to do.
|
|
if let Some(work_journal_tx) = &work_journal_tx {
|
|
// We don't care if this message was successfully sent, we only use the journal
|
|
// during testing.
|
|
let _ = work_journal_tx.try_send(NOTHING_TO_DO);
|
|
}
|
|
None
|
|
};
|
|
|
|
if let Some(work_event) = work_event {
|
|
let work_type = work_event.to_type();
|
|
self.spawn_worker(work_event, created_timestamp, idle_tx);
|
|
Some(work_type)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
// There is no new work event and we are unable to spawn a new worker.
|
|
//
|
|
// I cannot see any good reason why this would happen.
|
|
None => {
|
|
warn!(
|
|
msg = "no new work and cannot spawn worker",
|
|
"Unexpected gossip processor condition"
|
|
);
|
|
None
|
|
}
|
|
// The chain is syncing and this event should be dropped during sync.
|
|
Some(work_event)
|
|
if self.network_globals.sync_state.read().is_syncing()
|
|
&& drop_during_sync =>
|
|
{
|
|
let work_id = work_event.work.str_id();
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT,
|
|
&[work_id],
|
|
);
|
|
trace!(
|
|
msg = "chain is syncing",
|
|
work_id = work_id,
|
|
"Gossip processor skipping work"
|
|
);
|
|
None
|
|
}
|
|
// There is a new work event and the chain is not syncing. Process it or queue
|
|
// it.
|
|
Some(WorkEvent { work, .. }) => {
|
|
let work_id = work.str_id();
|
|
let work_type = work.to_type();
|
|
|
|
match work {
|
|
Work::Reprocess(work_event) => {
|
|
if let Err(e) = reprocess_work_tx.try_send(work_event) {
|
|
error!(
|
|
error = ?e,
|
|
"Failed to reprocess work event"
|
|
)
|
|
}
|
|
}
|
|
_ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
|
|
Work::GossipAttestation { .. } => attestation_queue.push(work),
|
|
// Attestation batches are formed internally within the
|
|
// `BeaconProcessor`, they are not sent from external services.
|
|
Work::GossipAttestationBatch { .. } => crit!(
|
|
work_type = "GossipAttestationBatch",
|
|
"Unsupported inbound event"
|
|
),
|
|
Work::GossipAggregate { .. } => aggregate_queue.push(work),
|
|
// Aggregate batches are formed internally within the `BeaconProcessor`,
|
|
// they are not sent from external services.
|
|
Work::GossipAggregateBatch { .. } => {
|
|
crit!(
|
|
work_type = "GossipAggregateBatch",
|
|
"Unsupported inbound event"
|
|
)
|
|
}
|
|
Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id),
|
|
Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id),
|
|
Work::GossipDataColumnSidecar { .. } => {
|
|
gossip_data_column_queue.push(work, work_id)
|
|
}
|
|
Work::DelayedImportBlock { .. } => {
|
|
delayed_block_queue.push(work, work_id)
|
|
}
|
|
Work::GossipVoluntaryExit { .. } => {
|
|
gossip_voluntary_exit_queue.push(work, work_id)
|
|
}
|
|
Work::GossipProposerSlashing { .. } => {
|
|
gossip_proposer_slashing_queue.push(work, work_id)
|
|
}
|
|
Work::GossipAttesterSlashing { .. } => {
|
|
gossip_attester_slashing_queue.push(work, work_id)
|
|
}
|
|
Work::GossipSyncSignature { .. } => sync_message_queue.push(work),
|
|
Work::GossipSyncContribution { .. } => {
|
|
sync_contribution_queue.push(work)
|
|
}
|
|
Work::GossipLightClientFinalityUpdate { .. } => {
|
|
lc_gossip_finality_update_queue.push(work, work_id)
|
|
}
|
|
Work::GossipLightClientOptimisticUpdate { .. } => {
|
|
lc_gossip_optimistic_update_queue.push(work, work_id)
|
|
}
|
|
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
|
|
rpc_block_queue.push(work, work_id)
|
|
}
|
|
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id),
|
|
Work::RpcCustodyColumn { .. } => {
|
|
rpc_custody_column_queue.push(work, work_id)
|
|
}
|
|
Work::ColumnReconstruction(_) => column_reconstruction_queue.push(work),
|
|
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
|
|
Work::ChainSegmentBackfill { .. } => {
|
|
backfill_chain_segment.push(work, work_id)
|
|
}
|
|
Work::Status { .. } => status_queue.push(work, work_id),
|
|
Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id),
|
|
Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id),
|
|
Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id),
|
|
Work::LightClientBootstrapRequest { .. } => {
|
|
lc_bootstrap_queue.push(work, work_id)
|
|
}
|
|
Work::LightClientOptimisticUpdateRequest { .. } => {
|
|
lc_rpc_optimistic_update_queue.push(work, work_id)
|
|
}
|
|
Work::LightClientFinalityUpdateRequest { .. } => {
|
|
lc_rpc_finality_update_queue.push(work, work_id)
|
|
}
|
|
Work::LightClientUpdatesByRangeRequest { .. } => {
|
|
lc_update_range_queue.push(work, work_id)
|
|
}
|
|
Work::UnknownBlockAttestation { .. } => {
|
|
unknown_block_attestation_queue.push(work)
|
|
}
|
|
Work::UnknownBlockAggregate { .. } => {
|
|
unknown_block_aggregate_queue.push(work)
|
|
}
|
|
Work::GossipBlsToExecutionChange { .. } => {
|
|
gossip_bls_to_execution_change_queue.push(work, work_id)
|
|
}
|
|
Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id),
|
|
Work::DataColumnsByRootsRequest { .. } => {
|
|
dcbroots_queue.push(work, work_id)
|
|
}
|
|
Work::DataColumnsByRangeRequest { .. } => {
|
|
dcbrange_queue.push(work, work_id)
|
|
}
|
|
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
|
unknown_light_client_update_queue.push(work, work_id)
|
|
}
|
|
Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id),
|
|
Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id),
|
|
};
|
|
Some(work_type)
|
|
}
|
|
};
|
|
|
|
if let Some(modified_queue_id) = modified_queue_id {
|
|
let queue_len = match modified_queue_id {
|
|
WorkType::GossipAttestation => attestation_queue.len(),
|
|
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
|
|
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
|
|
WorkType::GossipAttestationBatch => 0, // No queue
|
|
WorkType::GossipAggregate => aggregate_queue.len(),
|
|
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
|
|
WorkType::UnknownLightClientOptimisticUpdate => {
|
|
unknown_light_client_update_queue.len()
|
|
}
|
|
WorkType::GossipAggregateBatch => 0, // No queue
|
|
WorkType::GossipBlock => gossip_block_queue.len(),
|
|
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
|
|
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
|
|
WorkType::DelayedImportBlock => delayed_block_queue.len(),
|
|
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
|
|
WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
|
|
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
|
|
WorkType::GossipSyncSignature => sync_message_queue.len(),
|
|
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
|
|
WorkType::GossipLightClientFinalityUpdate => {
|
|
lc_gossip_finality_update_queue.len()
|
|
}
|
|
WorkType::GossipLightClientOptimisticUpdate => {
|
|
lc_gossip_optimistic_update_queue.len()
|
|
}
|
|
WorkType::RpcBlock => rpc_block_queue.len(),
|
|
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
|
|
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
|
|
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
|
|
WorkType::ChainSegment => chain_segment_queue.len(),
|
|
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
|
|
WorkType::Status => status_queue.len(),
|
|
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
|
|
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
|
|
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
|
|
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
|
|
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
|
|
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
|
|
WorkType::GossipBlsToExecutionChange => {
|
|
gossip_bls_to_execution_change_queue.len()
|
|
}
|
|
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
|
|
WorkType::LightClientOptimisticUpdateRequest => {
|
|
lc_rpc_optimistic_update_queue.len()
|
|
}
|
|
WorkType::LightClientFinalityUpdateRequest => {
|
|
lc_rpc_finality_update_queue.len()
|
|
}
|
|
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
|
|
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
|
|
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
|
|
WorkType::Reprocess => 0,
|
|
};
|
|
metrics::observe_vec(
|
|
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
|
|
&[modified_queue_id.into()],
|
|
queue_len as f64,
|
|
);
|
|
}
|
|
|
|
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
|
|
error!(
|
|
msg = "the system has insufficient resources for load",
|
|
queue_len = aggregate_queue.max_length,
|
|
"Aggregate attestation queue full"
|
|
)
|
|
}
|
|
|
|
if attestation_queue.is_full() && attestation_debounce.elapsed() {
|
|
error!(
|
|
msg = "the system has insufficient resources for load",
|
|
queue_len = attestation_queue.max_length,
|
|
"Attestation queue full"
|
|
)
|
|
}
|
|
}
|
|
};
|
|
|
|
// Spawn on the core executor.
|
|
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
|
Ok(())
|
|
}
|
|
|
|
/// Spawns a blocking worker thread to process some `Work`.
|
|
///
|
|
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
|
|
fn spawn_worker(
|
|
&mut self,
|
|
work: Work<E>,
|
|
created_timestamp: Instant,
|
|
idle_tx: mpsc::Sender<WorkType>,
|
|
) {
|
|
let work_id = work.str_id();
|
|
let work_type = work.to_type();
|
|
|
|
// This metric tracks how long a work event has been in the queue
|
|
metrics::observe_timer_vec(
|
|
&metrics::BEACON_PROCESSOR_QUEUE_TIME,
|
|
&[work_type.into()],
|
|
Instant::now() - created_timestamp,
|
|
);
|
|
|
|
let worker_timer =
|
|
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
|
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT,
|
|
&[work.str_id()],
|
|
);
|
|
|
|
metrics::inc_gauge_vec(
|
|
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
|
|
&[work_id],
|
|
);
|
|
|
|
// Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
|
|
//
|
|
// This helps ensure that the worker is always freed in the case of an early exit or panic.
|
|
// As such, this instantiation should happen as early in the function as possible.
|
|
let send_idle_on_drop = SendOnDrop {
|
|
tx: idle_tx,
|
|
work_type: work.to_type(),
|
|
_worker_timer: worker_timer,
|
|
};
|
|
|
|
let worker_id = self.current_workers;
|
|
self.current_workers = self.current_workers.saturating_add(1);
|
|
|
|
let executor = self.executor.clone();
|
|
|
|
trace!(
|
|
work = work_id,
|
|
worker = worker_id,
|
|
"Spawning beacon processor worker"
|
|
);
|
|
|
|
let task_spawner = TaskSpawner {
|
|
executor,
|
|
send_idle_on_drop,
|
|
};
|
|
|
|
match work {
|
|
Work::GossipAttestation {
|
|
attestation,
|
|
process_individual,
|
|
process_batch: _,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
process_individual(*attestation);
|
|
}),
|
|
Work::GossipAttestationBatch {
|
|
attestations,
|
|
process_batch,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
process_batch(attestations);
|
|
}),
|
|
Work::GossipAggregate {
|
|
aggregate,
|
|
process_individual,
|
|
process_batch: _,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
process_individual(*aggregate);
|
|
}),
|
|
Work::GossipAggregateBatch {
|
|
aggregates,
|
|
process_batch,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
process_batch(aggregates);
|
|
}),
|
|
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
|
|
process_fn.await;
|
|
}),
|
|
Work::UnknownBlockAttestation { process_fn }
|
|
| Work::UnknownBlockAggregate { process_fn }
|
|
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
|
|
task_spawner.spawn_blocking(process_fn)
|
|
}
|
|
Work::DelayedImportBlock {
|
|
beacon_block_slot: _,
|
|
beacon_block_root: _,
|
|
process_fn,
|
|
} => task_spawner.spawn_async(process_fn),
|
|
Work::RpcBlock { process_fn }
|
|
| Work::RpcBlobs { process_fn }
|
|
| Work::RpcCustodyColumn(process_fn)
|
|
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
|
|
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
|
|
Work::GossipBlock(work)
|
|
| Work::GossipBlobSidecar(work)
|
|
| Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
|
|
work.await;
|
|
}),
|
|
Work::BlobsByRangeRequest(process_fn)
|
|
| Work::BlobsByRootsRequest(process_fn)
|
|
| Work::DataColumnsByRootsRequest(process_fn)
|
|
| Work::DataColumnsByRangeRequest(process_fn) => {
|
|
task_spawner.spawn_blocking(process_fn)
|
|
}
|
|
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
|
|
task_spawner.spawn_async(work)
|
|
}
|
|
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
|
|
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
|
|
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
|
|
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
|
|
},
|
|
Work::GossipVoluntaryExit(process_fn)
|
|
| Work::GossipProposerSlashing(process_fn)
|
|
| Work::GossipAttesterSlashing(process_fn)
|
|
| Work::GossipSyncSignature(process_fn)
|
|
| Work::GossipSyncContribution(process_fn)
|
|
| Work::GossipLightClientFinalityUpdate(process_fn)
|
|
| Work::GossipLightClientOptimisticUpdate(process_fn)
|
|
| Work::Status(process_fn)
|
|
| Work::GossipBlsToExecutionChange(process_fn)
|
|
| Work::LightClientBootstrapRequest(process_fn)
|
|
| Work::LightClientOptimisticUpdateRequest(process_fn)
|
|
| Work::LightClientFinalityUpdateRequest(process_fn)
|
|
| Work::LightClientUpdatesByRangeRequest(process_fn) => {
|
|
task_spawner.spawn_blocking(process_fn)
|
|
}
|
|
Work::Reprocess(_) => {}
|
|
};
|
|
}
|
|
}
|
|
|
|
/// Spawns tasks that are either:
|
|
///
|
|
/// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor)
|
|
/// - Async (i.e. `async` methods)
|
|
///
|
|
/// Takes a `SendOnDrop` and ensures it is dropped after the task completes. This frees the beacon
|
|
/// processor worker so a new task can be started.
|
|
struct TaskSpawner {
|
|
executor: TaskExecutor,
|
|
send_idle_on_drop: SendOnDrop,
|
|
}
|
|
|
|
impl TaskSpawner {
|
|
/// Spawn an async task, dropping the `SendOnDrop` after the task has completed.
|
|
fn spawn_async(self, task: impl Future<Output = ()> + Send + 'static) {
|
|
self.executor.spawn(
|
|
async {
|
|
task.await;
|
|
drop(self.send_idle_on_drop)
|
|
},
|
|
WORKER_TASK_NAME,
|
|
)
|
|
}
|
|
|
|
/// Spawn a blocking task, dropping the `SendOnDrop` after the task has completed.
|
|
fn spawn_blocking<F>(self, task: F)
|
|
where
|
|
F: FnOnce() + Send + 'static,
|
|
{
|
|
self.executor.spawn_blocking(
|
|
|| {
|
|
task();
|
|
drop(self.send_idle_on_drop)
|
|
},
|
|
WORKER_TASK_NAME,
|
|
)
|
|
}
|
|
}
|
|
|
|
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
|
|
/// if the send fails (this happens when the node is shutting down).
|
|
///
|
|
/// ## Purpose
|
|
///
|
|
/// This is useful for ensuring that a worker-freed message is still sent if a worker panics.
|
|
///
|
|
/// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic:
|
|
///
|
|
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
|
|
pub struct SendOnDrop {
|
|
tx: mpsc::Sender<WorkType>,
|
|
work_type: WorkType,
|
|
// The field is unused, but it's here to ensure the timer is dropped once the task has finished.
|
|
_worker_timer: Option<metrics::HistogramTimer>,
|
|
}
|
|
|
|
impl Drop for SendOnDrop {
|
|
fn drop(&mut self) {
|
|
metrics::dec_gauge_vec(
|
|
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
|
|
&[self.work_type.clone().into()],
|
|
);
|
|
|
|
if let Err(e) = self.tx.try_send(self.work_type.clone()) {
|
|
warn!(
|
|
msg = "did not free worker, shutdown may be underway",
|
|
error = %e,
|
|
"Unable to free worker"
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec};
|
|
|
|
#[test]
|
|
fn min_queue_len() {
|
|
// State with no validators.
|
|
let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet());
|
|
let genesis_time = 0;
|
|
let state = BeaconState::<MainnetEthSpec>::new(genesis_time, Eth1Data::default(), &spec);
|
|
assert_eq!(state.validators().len(), 0);
|
|
let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap();
|
|
assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN);
|
|
assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN);
|
|
}
|
|
}
|