mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-16 03:12:41 +00:00
Merge commit '1e029ce5384e911390a513e2d1885532f34a8b2b' into eip4844
This commit is contained in:
@@ -63,6 +63,7 @@ use std::time::Duration;
|
||||
use std::{cmp, collections::HashSet};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
|
||||
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
|
||||
@@ -80,7 +81,9 @@ mod tests;
|
||||
mod work_reprocessing_queue;
|
||||
mod worker;
|
||||
|
||||
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
|
||||
use crate::beacon_processor::work_reprocessing_queue::{
|
||||
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
|
||||
};
|
||||
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
|
||||
|
||||
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
||||
@@ -230,6 +233,7 @@ pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_upd
|
||||
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
|
||||
pub const RPC_BLOCK: &str = "rpc_block";
|
||||
pub const CHAIN_SEGMENT: &str = "chain_segment";
|
||||
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
|
||||
pub const STATUS_PROCESSING: &str = "status_processing";
|
||||
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
|
||||
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
||||
@@ -802,6 +806,9 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
ReadyWork::BackfillSync(QueuedBackfillBatch { process_id, blocks }) => {
|
||||
WorkEvent::chain_segment(process_id, blocks)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -975,6 +982,10 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
|
||||
Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
|
||||
Work::RpcBlock { .. } => RPC_BLOCK,
|
||||
Work::ChainSegment {
|
||||
process_id: ChainSegmentProcessId::BackSyncBatchId { .. },
|
||||
..
|
||||
} => CHAIN_SEGMENT_BACKFILL,
|
||||
Work::ChainSegment { .. } => CHAIN_SEGMENT,
|
||||
Work::Status { .. } => STATUS_PROCESSING,
|
||||
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
|
||||
@@ -1142,23 +1153,23 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
|
||||
|
||||
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
|
||||
|
||||
let chain = match self.beacon_chain.upgrade() {
|
||||
Some(chain) => chain,
|
||||
// No need to proceed any further if the beacon chain has been dropped, the client
|
||||
// is shutting down.
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
||||
// receive them back once they are ready (`ready_work_rx`).
|
||||
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
|
||||
let work_reprocessing_tx = {
|
||||
if let Some(chain) = self.beacon_chain.upgrade() {
|
||||
spawn_reprocess_scheduler(
|
||||
ready_work_tx,
|
||||
&self.executor,
|
||||
chain.slot_clock.clone(),
|
||||
self.log.clone(),
|
||||
)
|
||||
} else {
|
||||
// No need to proceed any further if the beacon chain has been dropped, the client
|
||||
// is shutting down.
|
||||
return;
|
||||
}
|
||||
};
|
||||
let work_reprocessing_tx = spawn_reprocess_scheduler(
|
||||
ready_work_tx,
|
||||
&self.executor,
|
||||
chain.slot_clock.clone(),
|
||||
self.log.clone(),
|
||||
);
|
||||
|
||||
let executor = self.executor.clone();
|
||||
|
||||
@@ -1171,12 +1182,55 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
reprocess_work_rx: ready_work_rx,
|
||||
};
|
||||
|
||||
let enable_backfill_rate_limiting = chain.config.enable_backfill_rate_limiting;
|
||||
|
||||
loop {
|
||||
let work_event = match inbound_events.next().await {
|
||||
Some(InboundEvent::WorkerIdle) => {
|
||||
self.current_workers = self.current_workers.saturating_sub(1);
|
||||
None
|
||||
}
|
||||
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
|
||||
match QueuedBackfillBatch::try_from(event) {
|
||||
Ok(backfill_batch) => {
|
||||
match work_reprocessing_tx
|
||||
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
|
||||
{
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Unable to queue backfill work event. Will try to process now.";
|
||||
"error" => %e
|
||||
);
|
||||
match e {
|
||||
TrySendError::Full(reprocess_queue_message)
|
||||
| TrySendError::Closed(reprocess_queue_message) => {
|
||||
match reprocess_queue_message {
|
||||
ReprocessQueueMessage::BackfillSync(
|
||||
backfill_batch,
|
||||
) => Some(backfill_batch.into()),
|
||||
other => {
|
||||
crit!(
|
||||
self.log,
|
||||
"Unexpected queue message type";
|
||||
"message_type" => other.as_ref()
|
||||
);
|
||||
// This is an unhandled exception, drop the message.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(..) => {
|
||||
// backfill work sent to "reprocessing" queue. Process the next event.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(event) => Some(event),
|
||||
}
|
||||
}
|
||||
Some(InboundEvent::WorkEvent(event))
|
||||
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
|
||||
None => {
|
||||
|
||||
Reference in New Issue
Block a user