Add BeaconProcessorConfig

This commit is contained in:
Paul Hauner
2023-07-07 11:45:33 +10:00
parent 8a765938e8
commit 1157568c0d
9 changed files with 131 additions and 59 deletions

2
Cargo.lock generated
View File

@@ -714,7 +714,9 @@ dependencies = [
"lighthouse_metrics",
"lighthouse_network",
"logging",
"num_cpus",
"parking_lot 0.12.1",
"serde",
"slog",
"slot_clock",
"strum",

View File

@@ -21,4 +21,6 @@ types = { path = "../../consensus/types" }
ethereum_ssz = "0.5.0"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.12.0"
parking_lot = "0.12.0"
num_cpus = "1.13.0"
serde = { version = "1.0.116", features = ["derive"] }

View File

@@ -48,6 +48,7 @@ use lighthouse_network::NetworkGlobals;
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::cmp;
@@ -70,7 +71,7 @@ pub mod work_reprocessing_queue;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
@@ -79,7 +80,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const MAX_IDLE_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for re-processing work events.
pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4;
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
@@ -226,6 +227,55 @@ pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
pub max_workers: usize,
pub max_work_event_queue_len: usize,
pub max_scheduled_work_queue_len: usize,
pub enable_backfill_rate_limiting: bool,
}
impl Default for BeaconProcessorConfig {
fn default() -> Self {
Self {
max_workers: cmp::max(1, num_cpus::get()),
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
enable_backfill_rate_limiting: true,
}
}
}
// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}
impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}
impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
fn default() -> Self {
Self::new(&BeaconProcessorConfig::default())
}
}
/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
queue: VecDeque<T>,
@@ -656,7 +706,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize,
pub enable_backfill_rate_limiting: bool,
pub config: BeaconProcessorConfig,
pub log: Logger,
}
@@ -737,7 +787,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(MAX_SCHEDULED_WORK_QUEUE_LEN);
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
@@ -757,7 +807,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
reprocess_work_rx: ready_work_rx,
};
let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting;
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
loop {
let work_event = match inbound_events.next().await {

View File

@@ -13,10 +13,8 @@ use beacon_chain::{
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler,
};
use beacon_processor::{
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessor, BeaconProcessorSend,
WorkEvent, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
};
use beacon_processor::BeaconProcessorConfig;
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2::{
@@ -37,7 +35,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use timer::spawn_timer;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
ExecutionBlockHash, Hash256, SignedBeaconBlock,
@@ -76,11 +74,9 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
http_api_config: http_api::Config,
http_metrics_config: http_metrics::Config,
slasher: Option<Arc<Slasher<T::EthSpec>>>,
beacon_processor_config: Option<BeaconProcessorConfig>,
beacon_processor_channels: Option<BeaconProcessorChannels<T::EthSpec>>,
eth_spec_instance: T::EthSpec,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_receive: mpsc::Receiver<WorkEvent<T::EthSpec>>,
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
@@ -96,10 +92,6 @@ where
///
/// The `eth_spec_instance` parameter is used to concretize `TEthSpec`.
pub fn new(eth_spec_instance: TEthSpec) -> Self {
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
Self {
slot_clock: None,
store: None,
@@ -117,10 +109,8 @@ where
http_metrics_config: <_>::default(),
slasher: None,
eth_spec_instance,
beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
beacon_processor_receive,
work_reprocessing_tx,
work_reprocessing_rx,
beacon_processor_config: None,
beacon_processor_channels: None,
}
}
@@ -136,6 +126,12 @@ where
self
}
pub fn beacon_processor(mut self, config: BeaconProcessorConfig) -> Self {
self.beacon_processor_channels = Some(BeaconProcessorChannels::new(&config));
self.beacon_processor_config = Some(config);
self
}
pub fn slasher(mut self, slasher: Arc<Slasher<TEthSpec>>) -> Self {
self.slasher = Some(slasher);
self
@@ -571,6 +567,10 @@ where
.as_ref()
.ok_or("network requires a runtime_context")?
.clone();
let beacon_processor_channels = self
.beacon_processor_channels
.as_ref()
.ok_or("network requires beacon_processor_channels")?;
// If gossipsub metrics are required we build a registry to record them
let mut gossipsub_registry = if config.metrics_enabled {
@@ -586,8 +586,8 @@ where
gossipsub_registry
.as_mut()
.map(|registry| registry.sub_registry_with_prefix("gossipsub")),
self.beacon_processor_send.clone(),
self.work_reprocessing_tx.clone(),
beacon_processor_channels.beacon_processor_tx.clone(),
beacon_processor_channels.work_reprocessing_tx.clone(),
)
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;
@@ -710,6 +710,14 @@ where
.runtime_context
.as_ref()
.ok_or("build requires a runtime context")?;
let beacon_processor_channels = self
.beacon_processor_channels
.take()
.ok_or("build requires beacon_processor_channels")?;
let beacon_processor_config = self
.beacon_processor_config
.take()
.ok_or("build requires a beacon_processor_config")?;
let log = runtime_context.log().clone();
let http_api_listen_addr = if self.http_api_config.enabled {
@@ -719,7 +727,7 @@ where
network_senders: self.network_senders.clone(),
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
beacon_processor_send: Some(self.beacon_processor_send.clone()),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(),
log: log.clone(),
});
@@ -783,15 +791,13 @@ where
executor: beacon_processor_context.executor.clone(),
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
enable_backfill_rate_limiting: beacon_chain
.config
.enable_backfill_rate_limiting,
config: beacon_processor_config,
log: beacon_processor_context.log().clone(),
}
.spawn_manager(
self.beacon_processor_receive,
self.work_reprocessing_tx,
self.work_reprocessing_rx,
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
);

View File

@@ -1,4 +1,5 @@
use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD;
use beacon_processor::BeaconProcessorConfig;
use directory::DEFAULT_ROOT_DIR;
use environment::LoggerConfig;
use network::NetworkConfig;
@@ -80,6 +81,7 @@ pub struct Config {
pub slasher: Option<slasher::Config>,
pub logger_config: LoggerConfig,
pub always_prefer_builder_payload: bool,
pub beacon_processor: BeaconProcessorConfig,
}
impl Default for Config {
@@ -107,6 +109,7 @@ impl Default for Config {
validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
logger_config: LoggerConfig::default(),
always_prefer_builder_payload: false,
beacon_processor: <_>::default(),
}
}
}

View File

@@ -5,9 +5,7 @@ use beacon_chain::{
},
BeaconChain, BeaconChainTypes,
};
use beacon_processor::{
BeaconProcessor, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
};
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig};
use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts};
use lighthouse_network::{
@@ -33,7 +31,6 @@ use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use task_executor::test_utils::TestRuntime;
use tokio::sync::mpsc;
use types::{ChainSpec, EthSpec};
pub const TCP_PORT: u16 = 42;
@@ -187,16 +184,22 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap();
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let beacon_processor_send = BeaconProcessorSend(beacon_processor_tx);
let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let beacon_processor_config = BeaconProcessorConfig::default();
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let beacon_processor_send = beacon_processor_tx;
let test_runtime = TestRuntime::default();
BeaconProcessor {
network_globals: network_globals.clone(),
executor: test_runtime.task_executor.clone(),
max_workers: 1, // Single-threaded beacon processor.
current_workers: 0,
enable_backfill_rate_limiting: chain.config.enable_backfill_rate_limiting,
config: beacon_processor_config,
log: log.clone(),
}
.spawn_manager(

View File

@@ -7,9 +7,9 @@ use beacon_chain::{
};
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
use beacon_processor::{
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache,
GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent,
MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend,
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
WorkEvent as BeaconWorkEvent,
};
use environment::null_logger;
use lighthouse_network::{
@@ -545,11 +545,15 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
pub fn null_for_testing(
network_globals: Arc<NetworkGlobals<E>>,
) -> (Self, mpsc::Receiver<BeaconWorkEvent<E>>) {
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx: _work_reprocessing_rx,
} = <_>::default();
let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
let (reprocess_tx, _reprocess_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let log = null_logger().unwrap();
let harness: BeaconChainHarness<TestBeaconChainType<E>> =
BeaconChainHarness::builder(E::default())
@@ -562,19 +566,19 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
let runtime = TestRuntime::default();
let network_beacon_processor = Self {
beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
beacon_processor_send: beacon_processor_tx,
duplicate_cache: DuplicateCache::default(),
chain: harness.chain,
network_tx,
sync_tx,
reprocess_tx,
reprocess_tx: work_reprocessing_tx,
network_globals,
invalid_block_storage: InvalidBlockStorage::Disabled,
executor: runtime.task_executor.clone(),
log,
};
(network_beacon_processor, beacon_processor_receive)
(network_beacon_processor, beacon_processor_rx)
}
}

View File

@@ -4,15 +4,13 @@ mod tests {
use crate::persisted_dht::load_dht;
use crate::{NetworkConfig, NetworkService};
use beacon_chain::test_utils::BeaconChainHarness;
use beacon_processor::{
BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
};
use beacon_processor::BeaconProcessorChannels;
use lighthouse_network::Enr;
use slog::{o, Drain, Level, Logger};
use sloggers::{null::NullLoggerBuilder, Build};
use std::str::FromStr;
use std::sync::Arc;
use tokio::{runtime::Runtime, sync::mpsc};
use tokio::runtime::Runtime;
use types::MinimalEthSpec;
fn get_logger(actual_log: bool) -> Logger {
@@ -70,17 +68,20 @@ mod tests {
// Create a new network service which implicitly gets dropped at the
// end of the block.
let (beacon_processor_send, _beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let (beacon_processor_reprocess_tx, _beacon_processor_reprocess_rx) =
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx: _beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx: _work_reprocessing_rx,
} = <_>::default();
let _network_service = NetworkService::start(
beacon_chain.clone(),
&config,
executor,
None,
BeaconProcessorSend(beacon_processor_send),
beacon_processor_reprocess_tx,
beacon_processor_tx,
work_reprocessing_tx,
)
.await
.unwrap();

View File

@@ -83,6 +83,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
let builder = ClientBuilder::new(context.eth_spec_instance.clone())
.runtime_context(context)
.chain_spec(spec)
.beacon_processor(client_config.beacon_processor.clone())
.http_api_config(client_config.http_api.clone())
.disk_store(&db_path, &freezer_db_path, store_config, log.clone())?;