Resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-01-02 08:52:14 -06:00
918 changed files with 49304 additions and 37273 deletions

View File

@@ -39,14 +39,15 @@
//! task.
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::crit;
use logging::TimeLatch;
use logging::crit;
use parking_lot::Mutex;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
use std::cmp;
@@ -56,24 +57,24 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use std::time::{Duration, Instant};
use strum::IntoStaticStr;
use task_executor::TaskExecutor;
use task_executor::{RayonPoolType, TaskExecutor};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, error, trace, warn};
use types::{
Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
SingleAttestation, Slot, SubnetId,
};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
spawn_reprocess_scheduler,
};
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};
mod metrics;
pub mod work_reprocessing_queue;
pub mod scheduler;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
@@ -111,12 +112,10 @@ pub struct BeaconProcessorQueueLengths {
gossip_proposer_slashing_queue: usize,
gossip_attester_slashing_queue: usize,
unknown_light_client_update_queue: usize,
unknown_block_sampling_request_queue: usize,
rpc_block_queue: usize,
rpc_blob_queue: usize,
rpc_custody_column_queue: usize,
rpc_verify_data_column_queue: usize,
sampling_result_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
@@ -124,10 +123,10 @@ pub struct BeaconProcessorQueueLengths {
gossip_data_column_queue: usize,
delayed_block_queue: usize,
status_queue: usize,
bbrange_queue: usize,
bbroots_queue: usize,
blbroots_queue: usize,
blbrange_queue: usize,
block_brange_queue: usize,
block_broots_queue: usize,
blob_broots_queue: usize,
blob_brange_queue: usize,
dcbroots_queue: usize,
dcbrange_queue: usize,
gossip_bls_to_execution_change_queue: usize,
@@ -180,11 +179,10 @@ impl BeaconProcessorQueueLengths {
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
// TODO(das): Placeholder values
rpc_custody_column_queue: 1000,
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
// We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit
// this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB.
rpc_custody_column_queue: 64,
column_reconstruction_queue: 1,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
@@ -192,11 +190,10 @@ impl BeaconProcessorQueueLengths {
gossip_data_column_queue: 1024,
delayed_block_queue: 1024,
status_queue: 1024,
bbrange_queue: 1024,
bbroots_queue: 1024,
blbroots_queue: 1024,
blbrange_queue: 1024,
// TODO(das): pick proper values
block_brange_queue: 1024,
block_broots_queue: 1024,
blob_broots_queue: 1024,
blob_brange_queue: 1024,
dcbroots_queue: 1024,
dcbrange_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
@@ -265,22 +262,16 @@ impl Default for BeaconProcessorConfig {
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}
impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_work_event_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}
@@ -493,14 +484,16 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
process_fn,
},
},
ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockSamplingRequest { process_fn },
},
ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
},
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
Self {
drop_during_sync: true,
work: Work::ColumnReconstruction(process_fn),
}
}
}
}
}
@@ -552,32 +545,23 @@ pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}
pub type GossipAttestationBatch<E> = Vec<GossipAttestationPackage<Attestation<E>>>;
pub type GossipAttestationBatch = Vec<GossipAttestationPackage<SingleAttestation>>;
/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: Box<GossipAttestationPackage<Attestation<E>>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<Attestation<E>>) + Send + Sync>,
process_batch: Box<dyn FnOnce(GossipAttestationBatch<E>) + Send + Sync>,
},
// Attestation requiring conversion before processing.
//
// For now this is a `SingleAttestation`, but eventually we will switch this around so that
// legacy `Attestation`s are converted and the main processing pipeline operates on
// `SingleAttestation`s.
GossipAttestationToConvert {
attestation: Box<GossipAttestationPackage<SingleAttestation>>,
process_individual:
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
UnknownBlockAttestation {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: GossipAttestationBatch<E>,
process_batch: Box<dyn FnOnce(GossipAttestationBatch<E>) + Send + Sync>,
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
GossipAggregate {
aggregate: Box<GossipAggregatePackage<E>>,
@@ -591,9 +575,6 @@ pub enum Work<E: EthSpec> {
parent_root: Hash256,
process_fn: BlockingFn,
},
UnknownBlockSamplingRequest {
process_fn: BlockingFn,
},
GossipAggregateBatch {
aggregates: Vec<GossipAggregatePackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
@@ -620,13 +601,12 @@ pub enum Work<E: EthSpec> {
process_fn: AsyncFn,
},
RpcCustodyColumn(AsyncFn),
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
ChainSegmentBackfill(BlockingFn),
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
@@ -642,6 +622,7 @@ pub enum Work<E: EthSpec> {
GossipInclusionList(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
Reprocess(ReprocessQueueMessage),
}
impl<E: EthSpec> fmt::Debug for Work<E> {
@@ -650,7 +631,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
}
}
#[derive(IntoStaticStr, PartialEq, Eq, Debug)]
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
@@ -660,7 +641,6 @@ pub enum WorkType {
GossipAggregate,
UnknownBlockAggregate,
UnknownLightClientOptimisticUpdate,
UnknownBlockSamplingRequest,
GossipAggregateBatch,
GossipBlock,
GossipBlobSidecar,
@@ -676,8 +656,7 @@ pub enum WorkType {
RpcBlock,
RpcBlobs,
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
ColumnReconstruction,
IgnoredRpcBlock,
ChainSegment,
ChainSegmentBackfill,
@@ -696,6 +675,7 @@ pub enum WorkType {
GossipInclusionList,
ApiRequestP0,
ApiRequestP1,
Reprocess,
}
impl<E: EthSpec> Work<E> {
@@ -707,7 +687,6 @@ impl<E: EthSpec> Work<E> {
fn to_type(&self) -> WorkType {
match self {
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipAttestationToConvert { .. } => WorkType::GossipAttestationToConvert,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
@@ -728,8 +707,7 @@ impl<E: EthSpec> Work<E> {
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
@@ -748,13 +726,13 @@ impl<E: EthSpec> Work<E> {
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
}
Work::GossipInclusionList { .. } => WorkType::GossipInclusionList,
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
Work::Reprocess { .. } => WorkType::Reprocess,
}
}
}
@@ -764,9 +742,9 @@ enum InboundEvent<E: EthSpec> {
/// A worker has completed a task and is free.
WorkerIdle,
/// There is new work to be done.
WorkEvent(WorkEvent<E>),
WorkEvent((WorkEvent<E>, Instant)),
/// A work event that was queued for re-processing has become ready.
ReprocessingWork(WorkEvent<E>),
ReprocessingWork((WorkEvent<E>, Instant)),
}
/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
@@ -775,11 +753,11 @@ enum InboundEvent<E: EthSpec> {
/// control (specifically in the ordering of event processing).
struct InboundEvents<E: EthSpec> {
/// Used by workers when they finish a task.
idle_rx: mpsc::Receiver<()>,
idle_rx: mpsc::Receiver<WorkType>,
/// Used by upstream processes to send new work to the `BeaconProcessor`.
event_rx: mpsc::Receiver<WorkEvent<E>>,
/// Used internally for queuing work ready to be re-processed.
reprocess_work_rx: mpsc::Receiver<ReadyWork>,
ready_work_rx: mpsc::Receiver<ReadyWork>,
}
impl<E: EthSpec> Stream for InboundEvents<E> {
@@ -789,7 +767,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
// Always check for idle workers before anything else. This allows us to ensure that a big
// stream of new events doesn't suppress the processing of existing events.
match self.idle_rx.poll_recv(cx) {
Poll::Ready(Some(())) => {
Poll::Ready(Some(_)) => {
return Poll::Ready(Some(InboundEvent::WorkerIdle));
}
Poll::Ready(None) => {
@@ -800,9 +778,12 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
// Poll for delayed blocks before polling for new work. It might be the case that a delayed
// block is required to successfully process some new work.
match self.reprocess_work_rx.poll_recv(cx) {
match self.ready_work_rx.poll_recv(cx) {
Poll::Ready(Some(ready_work)) => {
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
return Poll::Ready(Some(InboundEvent::ReprocessingWork((
ready_work.into(),
Instant::now(),
))));
}
Poll::Ready(None) => {
return Poll::Ready(None);
@@ -812,7 +793,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
match self.event_rx.poll_recv(cx) {
Poll::Ready(Some(event)) => {
return Poll::Ready(Some(InboundEvent::WorkEvent(event)));
return Poll::Ready(Some(InboundEvent::WorkEvent((event, Instant::now()))));
}
Poll::Ready(None) => {
return Poll::Ready(None);
@@ -851,15 +832,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
pub fn spawn_manager<S: SlotClock + 'static>(
mut self,
event_rx: mpsc::Receiver<WorkEvent<E>>,
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
work_journal_tx: Option<mpsc::Sender<&'static str>>,
slot_clock: S,
maximum_gossip_clock_disparity: Duration,
queue_lengths: BeaconProcessorQueueLengths,
) -> Result<(), String> {
// Used by workers to communicate that they are finished a task.
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);
// Using LIFO queues for attestations since validator profits rely upon getting fresh
// attestations into blocks. Additionally, later attestations contain more information than
@@ -893,12 +872,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let mut rpc_verify_data_column_queue =
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut column_reconstruction_queue =
LifoQueue::new(queue_lengths.column_reconstruction_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
@@ -907,10 +882,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);
let mut block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
let mut block_broots_queue = FifoQueue::new(queue_lengths.block_broots_queue);
let mut blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue);
let mut blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue);
let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);
@@ -940,9 +915,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
let (reprocess_work_tx, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
reprocess_work_rx,
&self.executor,
Arc::new(slot_clock),
maximum_gossip_clock_disparity,
@@ -956,21 +935,23 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut inbound_events = InboundEvents {
idle_rx,
event_rx,
reprocess_work_rx: ready_work_rx,
ready_work_rx,
};
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
loop {
let work_event = match inbound_events.next().await {
let (work_event, created_timestamp) = match inbound_events.next().await {
Some(InboundEvent::WorkerIdle) => {
self.current_workers = self.current_workers.saturating_sub(1);
None
(None, Instant::now())
}
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
Some(InboundEvent::WorkEvent((event, created_timestamp)))
if enable_backfill_rate_limiting =>
{
match QueuedBackfillBatch::try_from(event) {
Ok(backfill_batch) => {
match work_reprocessing_tx
match reprocess_work_tx
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
{
Err(e) => {
@@ -984,7 +965,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
match reprocess_queue_message {
ReprocessQueueMessage::BackfillSync(
backfill_batch,
) => Some(backfill_batch.into()),
) => (
Some(backfill_batch.into()),
created_timestamp,
),
other => {
crit!(
message_type = other.as_ref(),
@@ -1003,11 +987,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
}
}
Err(event) => Some(event),
Err(event) => (Some(event), created_timestamp),
}
}
Some(InboundEvent::WorkEvent(event))
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
Some(InboundEvent::WorkEvent((event, created_timestamp)))
| Some(InboundEvent::ReprocessingWork((event, created_timestamp))) => {
(Some(event), created_timestamp)
}
None => {
debug!(msg = "stream ended", "Gossip processor stopped");
break;
@@ -1032,8 +1018,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
.unwrap_or(WORKER_FREED);
// We don't care if this message was successfully sent, we only use the journal
// during testing.
let _ = work_journal_tx.try_send(id);
// during testing. We also ignore reprocess messages to ensure our test cases can pass.
if id != "reprocess" {
let _ = work_journal_tx.try_send(id);
}
}
let can_spawn = self.current_workers < self.config.max_workers;
@@ -1061,13 +1049,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = rpc_custody_column_queue.pop() {
Some(item)
// TODO(das): decide proper prioritization for sampling columns
} else if let Some(item) = rpc_custody_column_queue.pop() {
Some(item)
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
Some(item)
} else if let Some(item) = sampling_result_queue.pop() {
Some(item)
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
@@ -1080,6 +1063,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = gossip_data_column_queue.pop() {
Some(item)
} else if let Some(item) = column_reconstruction_queue.pop() {
Some(item)
// Check the priority 0 API requests after blocks and blobs, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
Some(item)
@@ -1215,21 +1200,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
// and BlocksByRoot)
} else if let Some(item) = status_queue.pop() {
Some(item)
} else if let Some(item) = bbrange_queue.pop() {
} else if let Some(item) = block_brange_queue.pop() {
Some(item)
} else if let Some(item) = bbroots_queue.pop() {
} else if let Some(item) = block_broots_queue.pop() {
Some(item)
} else if let Some(item) = blbrange_queue.pop() {
} else if let Some(item) = blob_brange_queue.pop() {
Some(item)
} else if let Some(item) = blbroots_queue.pop() {
} else if let Some(item) = blob_broots_queue.pop() {
Some(item)
} else if let Some(item) = dcbroots_queue.pop() {
Some(item)
} else if let Some(item) = dcbrange_queue.pop() {
Some(item)
// Prioritize sampling requests after block syncing requests
} else if let Some(item) = unknown_block_sampling_request_queue.pop() {
Some(item)
// Check slashings after all other consensus messages so we prioritize
// following head.
//
@@ -1283,7 +1265,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
if let Some(work_event) = work_event {
let work_type = work_event.to_type();
self.spawn_worker(work_event, idle_tx);
self.spawn_worker(work_event, created_timestamp, idle_tx);
Some(work_type)
} else {
None
@@ -1323,11 +1305,16 @@ impl<E: EthSpec> BeaconProcessor<E> {
let work_type = work.to_type();
match work {
_ if can_spawn => self.spawn_worker(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
Work::GossipAttestationToConvert { .. } => {
attestation_to_convert_queue.push(work)
Work::Reprocess(work_event) => {
if let Err(e) = reprocess_work_tx.try_send(work_event) {
error!(
error = ?e,
"Failed to reprocess work event"
)
}
}
_ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
@@ -1351,6 +1338,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::DelayedImportBlock { .. } => {
delayed_block_queue.push(work, work_id)
}
Work::GossipInclusionList { .. } => {
gossip_inclusion_list_queue.push(work, work_id)
}
Work::GossipVoluntaryExit { .. } => {
gossip_voluntary_exit_queue.push(work, work_id)
}
@@ -1377,18 +1367,21 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::RpcCustodyColumn { .. } => {
rpc_custody_column_queue.push(work, work_id)
}
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id)
}
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
Work::ColumnReconstruction(_) => column_reconstruction_queue.push(work),
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
Work::ChainSegmentBackfill { .. } => {
backfill_chain_segment.push(work, work_id)
}
Work::Status { .. } => status_queue.push(work, work_id),
Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id),
Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id),
Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id),
Work::BlocksByRangeRequest { .. } => {
block_brange_queue.push(work, work_id)
}
Work::BlocksByRootsRequest { .. } => {
block_broots_queue.push(work, work_id)
}
Work::BlobsByRangeRequest { .. } => {
blob_brange_queue.push(work, work_id)
}
Work::LightClientBootstrapRequest { .. } => {
lc_bootstrap_queue.push(work, work_id)
}
@@ -1410,7 +1403,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::GossipBlsToExecutionChange { .. } => {
gossip_bls_to_execution_change_queue.push(work, work_id)
}
Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id),
Work::BlobsByRootsRequest { .. } => {
blob_broots_queue.push(work, work_id)
}
Work::DataColumnsByRootsRequest { .. } => {
dcbroots_queue.push(work, work_id)
}
@@ -1420,12 +1415,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id)
}
Work::UnknownBlockSamplingRequest { .. } => {
unknown_block_sampling_request_queue.push(work, work_id)
}
Work::GossipInclusionList { .. } => {
gossip_inclusion_list_queue.push(work, work_id)
}
Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id),
Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id),
};
@@ -1433,11 +1422,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
};
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL,
self.current_workers as i64,
);
if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => attestation_queue.len(),
@@ -1449,9 +1433,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::UnknownBlockSamplingRequest => {
unknown_block_sampling_request_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
@@ -1471,15 +1452,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::BlocksByRangeRequest => block_brange_queue.len(),
WorkType::BlocksByRootsRequest => block_broots_queue.len(),
WorkType::BlobsByRangeRequest => blob_brange_queue.len(),
WorkType::BlobsByRootsRequest => blob_broots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
@@ -1496,6 +1476,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
@@ -1530,8 +1511,22 @@ impl<E: EthSpec> BeaconProcessor<E> {
/// Spawns a blocking worker thread to process some `Work`.
///
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
fn spawn_worker(&mut self, work: Work<E>, idle_tx: mpsc::Sender<()>) {
fn spawn_worker(
&mut self,
work: Work<E>,
created_timestamp: Instant,
idle_tx: mpsc::Sender<WorkType>,
) {
let work_id = work.str_id();
let work_type = work.to_type();
// This metric tracks how long a work event has been in the queue
metrics::observe_timer_vec(
&metrics::BEACON_PROCESSOR_QUEUE_TIME,
&[work_type.into()],
Instant::now() - created_timestamp,
);
let worker_timer =
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
@@ -1540,12 +1535,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
&[work.str_id()],
);
metrics::inc_gauge_vec(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
&[work_id],
);
// Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
//
// This helps ensure that the worker is always freed in the case of an early exit or panic.
// As such, this instantiation should happen as early in the function as possible.
let send_idle_on_drop = SendOnDrop {
tx: idle_tx,
work_type: work.to_type(),
_worker_timer: worker_timer,
};
@@ -1573,12 +1574,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipAttestationToConvert {
attestation,
process_individual,
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
process_batch,
@@ -1603,8 +1598,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. }
| Work::UnknownBlockSamplingRequest { process_fn } => {
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
task_spawner.spawn_blocking(process_fn)
}
Work::DelayedImportBlock {
@@ -1615,8 +1609,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
@@ -1632,7 +1625,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ChainSegmentBackfill(process_fn) => {
if self.config.enable_backfill_rate_limiting {
task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
} else {
// use the global rayon thread pool if backfill rate limiting is disabled.
task_spawner.spawn_blocking(process_fn)
}
}
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
@@ -1646,11 +1646,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::GossipLightClientOptimisticUpdate(process_fn)
| Work::Status(process_fn)
| Work::GossipBlsToExecutionChange(process_fn)
| Work::GossipInclusionList(process_fn)
| Work::LightClientBootstrapRequest(process_fn)
| Work::LightClientOptimisticUpdateRequest(process_fn)
| Work::LightClientFinalityUpdateRequest(process_fn)
| Work::LightClientUpdatesByRangeRequest(process_fn)
| Work::GossipInclusionList(process_fn) => task_spawner.spawn_blocking(process_fn),
| Work::LightClientUpdatesByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::Reprocess(_) => {}
};
}
}
@@ -1692,6 +1695,21 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
fn spawn_blocking_with_rayon<F>(self, rayon_pool_type: RayonPoolType, task: F)
where
F: FnOnce() + Send + 'static,
{
self.executor.spawn_blocking_with_rayon(
move || {
task();
drop(self.send_idle_on_drop)
},
rayon_pool_type,
WORKER_TASK_NAME,
)
}
}
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
@@ -1705,14 +1723,20 @@ impl TaskSpawner {
///
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
pub struct SendOnDrop {
tx: mpsc::Sender<()>,
tx: mpsc::Sender<WorkType>,
work_type: WorkType,
// The field is unused, but it's here to ensure the timer is dropped once the task has finished.
_worker_timer: Option<metrics::HistogramTimer>,
}
impl Drop for SendOnDrop {
fn drop(&mut self) {
if let Err(e) = self.tx.try_send(()) {
metrics::dec_gauge_vec(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
&[self.work_type.clone().into()],
);
if let Err(e) = self.tx.try_send(self.work_type.clone()) {
warn!(
msg = "did not free worker, shutdown may be underway",
error = %e,