mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 08:52:54 +00:00
Make beacon processor queue sizes dynamic (#5573)
* Make beacon processor queue sizes dynamic * Update tests * lint * Review PR
This commit is contained in:
@@ -60,7 +60,9 @@ use std::time::Duration;
|
|||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
|
use types::{
|
||||||
|
Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId,
|
||||||
|
};
|
||||||
use types::{EthSpec, Slot};
|
use types::{EthSpec, Slot};
|
||||||
use work_reprocessing_queue::IgnoredRpcBlock;
|
use work_reprocessing_queue::IgnoredRpcBlock;
|
||||||
use work_reprocessing_queue::{
|
use work_reprocessing_queue::{
|
||||||
@@ -85,123 +87,98 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384;
|
|||||||
/// The maximum size of the channel for re-processing work events.
|
/// The maximum size of the channel for re-processing work events.
|
||||||
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;
|
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;
|
||||||
|
|
||||||
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
|
/// Over-provision queues based on active validator count by some factor. The beacon chain has
|
||||||
/// them.
|
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
|
||||||
const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384;
|
/// slightly, we don't need to adjust the queues during the lifetime of a process.
|
||||||
|
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;
|
||||||
|
|
||||||
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
|
/// Maximum number of queued items that will be stored before dropping them
|
||||||
/// them.
|
pub struct BeaconProcessorQueueLengths {
|
||||||
const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192;
|
aggregate_queue: usize,
|
||||||
|
attestation_queue: usize,
|
||||||
|
unknown_block_aggregate_queue: usize,
|
||||||
|
unknown_block_attestation_queue: usize,
|
||||||
|
sync_message_queue: usize,
|
||||||
|
sync_contribution_queue: usize,
|
||||||
|
gossip_voluntary_exit_queue: usize,
|
||||||
|
gossip_proposer_slashing_queue: usize,
|
||||||
|
gossip_attester_slashing_queue: usize,
|
||||||
|
finality_update_queue: usize,
|
||||||
|
optimistic_update_queue: usize,
|
||||||
|
unknown_light_client_update_queue: usize,
|
||||||
|
rpc_block_queue: usize,
|
||||||
|
rpc_blob_queue: usize,
|
||||||
|
chain_segment_queue: usize,
|
||||||
|
backfill_chain_segment: usize,
|
||||||
|
gossip_block_queue: usize,
|
||||||
|
gossip_blob_queue: usize,
|
||||||
|
delayed_block_queue: usize,
|
||||||
|
status_queue: usize,
|
||||||
|
bbrange_queue: usize,
|
||||||
|
bbroots_queue: usize,
|
||||||
|
blbroots_queue: usize,
|
||||||
|
blbrange_queue: usize,
|
||||||
|
gossip_bls_to_execution_change_queue: usize,
|
||||||
|
lc_bootstrap_queue: usize,
|
||||||
|
lc_optimistic_update_queue: usize,
|
||||||
|
lc_finality_update_queue: usize,
|
||||||
|
api_request_p0_queue: usize,
|
||||||
|
api_request_p1_queue: usize,
|
||||||
|
}
|
||||||
|
|
||||||
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
|
impl BeaconProcessorQueueLengths {
|
||||||
/// start dropping them.
|
pub fn from_state<E: EthSpec>(
|
||||||
const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 4_096;
|
state: &BeaconState<E>,
|
||||||
|
spec: &ChainSpec,
|
||||||
|
) -> Result<Self, String> {
|
||||||
|
let active_validator_count =
|
||||||
|
match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
|
||||||
|
Ok(indices) => indices.len(),
|
||||||
|
Err(_) => state
|
||||||
|
.get_active_validator_indices(state.current_epoch(), spec)
|
||||||
|
.map_err(|e| format!("Error computing active indices: {:?}", e))?
|
||||||
|
.len(),
|
||||||
|
};
|
||||||
|
let active_validator_count =
|
||||||
|
(ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
|
||||||
|
let slots_per_epoch = E::slots_per_epoch() as usize;
|
||||||
|
|
||||||
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
|
Ok(Self {
|
||||||
/// start dropping them.
|
aggregate_queue: 4096,
|
||||||
const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
|
unknown_block_aggregate_queue: 1024,
|
||||||
|
// Capacity for a full slot's worth of attestations if subscribed to all subnets
|
||||||
/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored
|
attestation_queue: active_validator_count / slots_per_epoch,
|
||||||
/// before we start dropping them.
|
// Capacity for a full slot's worth of attestations if subscribed to all subnets
|
||||||
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
|
unknown_block_attestation_queue: active_validator_count / slots_per_epoch,
|
||||||
|
sync_message_queue: 2048,
|
||||||
/// The maximum number of queued `BlobSidecar` objects received on gossip that
|
sync_contribution_queue: 1024,
|
||||||
/// will be stored before we start dropping them.
|
gossip_voluntary_exit_queue: 4096,
|
||||||
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
|
gossip_proposer_slashing_queue: 4096,
|
||||||
|
gossip_attester_slashing_queue: 4096,
|
||||||
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
|
finality_update_queue: 1024,
|
||||||
/// within acceptable clock disparity) that will be queued before we start dropping them.
|
optimistic_update_queue: 1024,
|
||||||
const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024;
|
unknown_light_client_update_queue: 128,
|
||||||
|
rpc_block_queue: 1024,
|
||||||
/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored
|
rpc_blob_queue: 1024,
|
||||||
/// before we start dropping them.
|
chain_segment_queue: 64,
|
||||||
const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096;
|
backfill_chain_segment: 64,
|
||||||
|
gossip_block_queue: 1024,
|
||||||
/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored
|
gossip_blob_queue: 1024,
|
||||||
/// before we start dropping them.
|
delayed_block_queue: 1024,
|
||||||
const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
|
status_queue: 1024,
|
||||||
|
bbrange_queue: 1024,
|
||||||
/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored
|
bbroots_queue: 1024,
|
||||||
/// before we start dropping them.
|
blbroots_queue: 1024,
|
||||||
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
|
blbrange_queue: 1024,
|
||||||
|
gossip_bls_to_execution_change_queue: 16384,
|
||||||
/// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored
|
lc_bootstrap_queue: 1024,
|
||||||
/// before we start dropping them.
|
lc_optimistic_update_queue: 512,
|
||||||
const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
|
lc_finality_update_queue: 512,
|
||||||
|
api_request_p0_queue: 1024,
|
||||||
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
api_request_p1_queue: 1024,
|
||||||
/// before we start dropping them.
|
})
|
||||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
}
|
||||||
|
}
|
||||||
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
|
||||||
/// for reprocessing before we start dropping them.
|
|
||||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
|
|
||||||
|
|
||||||
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
|
||||||
/// them.
|
|
||||||
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
|
||||||
|
|
||||||
/// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we
|
|
||||||
/// start dropping them.
|
|
||||||
const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `BlobSidecar` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
|
|
||||||
/// be stored before we start dropping them.
|
|
||||||
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
|
|
||||||
|
|
||||||
/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be
|
|
||||||
/// stored before we start dropping them.
|
|
||||||
const MAX_STATUS_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `BlobsByRootRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them.
|
|
||||||
///
|
|
||||||
/// This value is set high to accommodate the large spike that is expected immediately after Capella
|
|
||||||
/// is activated.
|
|
||||||
const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
|
|
||||||
|
|
||||||
/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of queued `LightClientOptimisticUpdateRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 512;
|
|
||||||
|
|
||||||
/// The maximum number of queued `LightClientFinalityUpdateRequest` objects received from the network RPC that
|
|
||||||
/// will be stored before we start dropping them.
|
|
||||||
const MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN: usize = 512;
|
|
||||||
|
|
||||||
/// The maximum number of priority-0 (highest priority) messages that will be queued before
|
|
||||||
/// they begin to be dropped.
|
|
||||||
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
|
|
||||||
/// they begin to be dropped.
|
|
||||||
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;
|
|
||||||
|
|
||||||
/// The name of the manager tokio task.
|
/// The name of the manager tokio task.
|
||||||
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
|
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
|
||||||
@@ -772,6 +749,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
///
|
///
|
||||||
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
|
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
|
||||||
/// events processed by `self`. This should only be used during testing.
|
/// events processed by `self`. This should only be used during testing.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn spawn_manager<S: SlotClock + 'static>(
|
pub fn spawn_manager<S: SlotClock + 'static>(
|
||||||
mut self,
|
mut self,
|
||||||
event_rx: mpsc::Receiver<WorkEvent<E>>,
|
event_rx: mpsc::Receiver<WorkEvent<E>>,
|
||||||
@@ -780,6 +758,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
work_journal_tx: Option<mpsc::Sender<&'static str>>,
|
work_journal_tx: Option<mpsc::Sender<&'static str>>,
|
||||||
slot_clock: S,
|
slot_clock: S,
|
||||||
maximum_gossip_clock_disparity: Duration,
|
maximum_gossip_clock_disparity: Duration,
|
||||||
|
queue_lengths: BeaconProcessorQueueLengths,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
// Used by workers to communicate that they are finished a task.
|
// Used by workers to communicate that they are finished a task.
|
||||||
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
|
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
|
||||||
@@ -787,61 +766,61 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
||||||
// attestations into blocks. Additionally, later attestations contain more information than
|
// attestations into blocks. Additionally, later attestations contain more information than
|
||||||
// earlier ones, so we consider them more valuable.
|
// earlier ones, so we consider them more valuable.
|
||||||
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
|
let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
|
||||||
let mut aggregate_debounce = TimeLatch::default();
|
let mut aggregate_debounce = TimeLatch::default();
|
||||||
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
|
let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
|
||||||
let mut attestation_debounce = TimeLatch::default();
|
let mut attestation_debounce = TimeLatch::default();
|
||||||
let mut unknown_block_aggregate_queue =
|
let mut unknown_block_aggregate_queue =
|
||||||
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
|
||||||
let mut unknown_block_attestation_queue =
|
let mut unknown_block_attestation_queue =
|
||||||
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
|
||||||
|
|
||||||
let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
|
let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
|
||||||
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
|
let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
|
||||||
|
|
||||||
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
|
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
|
||||||
// a strong feeling about queue type for exits.
|
// a strong feeling about queue type for exits.
|
||||||
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
|
let mut gossip_voluntary_exit_queue =
|
||||||
|
FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue);
|
||||||
|
|
||||||
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
|
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
|
||||||
// queues with lots of junk messages.
|
// queues with lots of junk messages.
|
||||||
let mut gossip_proposer_slashing_queue =
|
let mut gossip_proposer_slashing_queue =
|
||||||
FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN);
|
FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue);
|
||||||
let mut gossip_attester_slashing_queue =
|
let mut gossip_attester_slashing_queue =
|
||||||
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
|
FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);
|
||||||
|
|
||||||
// Using a FIFO queue for light client updates to maintain sequence order.
|
// Using a FIFO queue for light client updates to maintain sequence order.
|
||||||
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
let mut finality_update_queue = FifoQueue::new(queue_lengths.finality_update_queue);
|
||||||
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
let mut optimistic_update_queue = FifoQueue::new(queue_lengths.optimistic_update_queue);
|
||||||
let mut unknown_light_client_update_queue =
|
let mut unknown_light_client_update_queue =
|
||||||
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
|
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
|
||||||
|
|
||||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
|
||||||
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
|
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
|
||||||
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
|
||||||
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
|
||||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
|
||||||
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
|
let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
|
||||||
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
|
let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
|
||||||
|
|
||||||
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
|
||||||
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
|
let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
|
||||||
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
|
let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
|
||||||
let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN);
|
let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
|
||||||
let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN);
|
let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);
|
||||||
|
|
||||||
let mut gossip_bls_to_execution_change_queue =
|
let mut gossip_bls_to_execution_change_queue =
|
||||||
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
|
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);
|
||||||
|
|
||||||
let mut lc_bootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
|
let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
|
||||||
let mut lc_optimistic_update_queue =
|
let mut lc_optimistic_update_queue =
|
||||||
FifoQueue::new(MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
FifoQueue::new(queue_lengths.lc_optimistic_update_queue);
|
||||||
let mut lc_finality_update_queue =
|
let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue);
|
||||||
FifoQueue::new(MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN);
|
|
||||||
|
|
||||||
let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
|
let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
|
||||||
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);
|
let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue);
|
||||||
|
|
||||||
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
||||||
// receive them back once they are ready (`ready_work_rx`).
|
// receive them back once they are ready (`ready_work_rx`).
|
||||||
|
|||||||
@@ -19,8 +19,8 @@ use beacon_chain::{
|
|||||||
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
|
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
|
||||||
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
|
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
|
||||||
};
|
};
|
||||||
use beacon_processor::BeaconProcessorConfig;
|
|
||||||
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
|
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
|
||||||
|
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use eth1::{Config as Eth1Config, Service as Eth1Service};
|
use eth1::{Config as Eth1Config, Service as Eth1Service};
|
||||||
use eth2::{
|
use eth2::{
|
||||||
@@ -884,6 +884,14 @@ where
|
|||||||
None,
|
None,
|
||||||
beacon_chain.slot_clock.clone(),
|
beacon_chain.slot_clock.clone(),
|
||||||
beacon_chain.spec.maximum_gossip_clock_disparity(),
|
beacon_chain.spec.maximum_gossip_clock_disparity(),
|
||||||
|
BeaconProcessorQueueLengths::from_state(
|
||||||
|
&beacon_chain
|
||||||
|
.canonical_head
|
||||||
|
.cached_head()
|
||||||
|
.snapshot
|
||||||
|
.beacon_state,
|
||||||
|
&beacon_chain.spec,
|
||||||
|
)?,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ use beacon_chain::{
|
|||||||
test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType},
|
test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType},
|
||||||
BeaconChain, BeaconChainTypes,
|
BeaconChain, BeaconChainTypes,
|
||||||
};
|
};
|
||||||
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig};
|
use beacon_processor::{
|
||||||
|
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
|
||||||
|
};
|
||||||
use directory::DEFAULT_ROOT_DIR;
|
use directory::DEFAULT_ROOT_DIR;
|
||||||
use eth2::{BeaconNodeHttpClient, Timeouts};
|
use eth2::{BeaconNodeHttpClient, Timeouts};
|
||||||
use lighthouse_network::{
|
use lighthouse_network::{
|
||||||
@@ -206,6 +208,11 @@ pub async fn create_api_server<T: BeaconChainTypes>(
|
|||||||
None,
|
None,
|
||||||
chain.slot_clock.clone(),
|
chain.slot_clock.clone(),
|
||||||
chain.spec.maximum_gossip_clock_disparity(),
|
chain.spec.maximum_gossip_clock_disparity(),
|
||||||
|
BeaconProcessorQueueLengths::from_state(
|
||||||
|
&chain.canonical_head.cached_head().snapshot.beacon_state,
|
||||||
|
&chain.spec,
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -239,6 +239,11 @@ impl TestRig {
|
|||||||
Some(work_journal_tx),
|
Some(work_journal_tx),
|
||||||
harness.chain.slot_clock.clone(),
|
harness.chain.slot_clock.clone(),
|
||||||
chain.spec.maximum_gossip_clock_disparity(),
|
chain.spec.maximum_gossip_clock_disparity(),
|
||||||
|
BeaconProcessorQueueLengths::from_state(
|
||||||
|
&chain.canonical_head.cached_head().snapshot.beacon_state,
|
||||||
|
&chain.spec,
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(beacon_processor.is_ok());
|
assert!(beacon_processor.is_ok());
|
||||||
|
|||||||
Reference in New Issue
Block a user