Add CLI flags

This commit is contained in:
Paul Hauner
2023-07-07 14:18:21 +10:00
parent 1157568c0d
commit e03017dfad
7 changed files with 106 additions and 20 deletions

1
Cargo.lock generated
View File

@@ -4708,6 +4708,7 @@ dependencies = [
"account_manager", "account_manager",
"account_utils", "account_utils",
"beacon_node", "beacon_node",
"beacon_processor",
"bls", "bls",
"boot_node", "boot_node",
"clap", "clap",

View File

@@ -79,8 +79,6 @@ pub struct ChainConfig {
/// ///
/// This is useful for block builders and testing. /// This is useful for block builders and testing.
pub always_prepare_payload: bool, pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation. /// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode, pub progressive_balances_mode: ProgressiveBalancesMode,
} }
@@ -112,7 +110,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false, genesis_backfill: false,
always_prepare_payload: false, always_prepare_payload: false,
enable_backfill_rate_limiting: true,
progressive_balances_mode: ProgressiveBalancesMode::Checked, progressive_balances_mode: ProgressiveBalancesMode::Checked,
} }
} }

View File

@@ -68,16 +68,21 @@ struct TestRig {
impl Drop for TestRig { impl Drop for TestRig {
fn drop(&mut self) { fn drop(&mut self) {
// Causes the beacon processor to shutdown. // Causes the beacon processor to shutdown.
self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN).0); let len = BeaconProcessorConfig::default().max_work_event_queue_len;
self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(len).0);
} }
} }
impl TestRig { impl TestRig {
pub async fn new(chain_length: u64) -> Self { pub async fn new(chain_length: u64) -> Self {
Self::new_with_chain_config(chain_length, ChainConfig::default()).await Self::new_parametric(
chain_length,
BeaconProcessorConfig::default().enable_backfill_rate_limiting,
)
.await
} }
pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self { pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self {
// This allows for testing voluntary exits without building out a massive chain. // This allows for testing voluntary exits without building out a massive chain.
let mut spec = E::default_spec(); let mut spec = E::default_spec();
spec.shard_committee_period = 2; spec.shard_committee_period = 2;
@@ -86,7 +91,7 @@ impl TestRig {
.spec(spec) .spec(spec)
.deterministic_keypairs(VALIDATOR_COUNT) .deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store() .fresh_ephemeral_store()
.chain_config(chain_config) .chain_config(<_>::default())
.build(); .build();
harness.advance_slot(); harness.advance_slot();
@@ -172,8 +177,15 @@ impl TestRig {
let log = harness.logger().clone(); let log = harness.logger().clone();
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); let mut beacon_processor_config = BeaconProcessorConfig::default();
let beacon_processor_tx = BeaconProcessorSend(beacon_processor_tx); beacon_processor_config.enable_backfill_rate_limiting = enable_backfill_rate_limiting;
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
// Default metadata // Default metadata
@@ -196,8 +208,6 @@ impl TestRig {
let executor = harness.runtime.task_executor.clone(); let executor = harness.runtime.task_executor.clone();
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364); let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364);
let duplicate_cache = DuplicateCache::default(); let duplicate_cache = DuplicateCache::default();
@@ -220,7 +230,7 @@ impl TestRig {
executor, executor,
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
enable_backfill_rate_limiting: harness.chain.config.enable_backfill_rate_limiting, config: beacon_processor_config,
log: log.clone(), log: log.clone(),
} }
.spawn_manager( .spawn_manager(
@@ -940,11 +950,8 @@ async fn test_backfill_sync_processing() {
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
#[tokio::test] #[tokio::test]
async fn test_backfill_sync_processing_rate_limiting_disabled() { async fn test_backfill_sync_processing_rate_limiting_disabled() {
let chain_config = ChainConfig { let enable_backfill_rate_limiting = false;
enable_backfill_rate_limiting: false, let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await;
..Default::default()
};
let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await;
for _ in 0..3 { for _ in 0..3 {
rig.enqueue_backfill_batch(); rig.enqueue_backfill_batch();

View File

@@ -1131,4 +1131,36 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true) .takes_value(true)
.possible_values(ProgressiveBalancesMode::VARIANTS) .possible_values(ProgressiveBalancesMode::VARIANTS)
) )
.arg(
Arg::with_name("beacon-processor-max-workers")
.long("beacon-processor-max-workers")
.value_name("INTEGER")
.help("Specifies the maximum concurrent tasks for the task scheduler. Increasing \
this value may increase resource consumption. Reducing the value \
may result in decreased resource usage and diminished performance. The \
default value is the number of logical CPU cores on the host.")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-work-queue")
.long("beacon-processor-work-queue")
.value_name("INTEGER")
.help("Specifies the length of the inbound event queue. Increasing this value \
may prevent messages from being dropped at risk of overwhelming the \
host resources. Decreasing this value may cause messages to be dropped but \
may help resource-constrained hosts.")
.default_value("16384")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-reprocess-queue")
.long("beacon-processor-reprocess-queue")
.value_name("INTEGER")
.help("Specifies the length of the queue for messages requiring delayed processing. \
Increasing this value may prevent messages from being dropped at risk of \
overwhelming the host resources. Decreasing this value may cause messages \
to be dropped but may help resource-constrained hosts.")
.default_value("12288")
.takes_value(true)
)
} }

View File

@@ -792,7 +792,7 @@ pub fn get_config<E: EthSpec>(
} }
// Backfill sync rate-limiting // Backfill sync rate-limiting
client_config.chain.enable_backfill_rate_limiting = client_config.beacon_processor.enable_backfill_rate_limiting =
!cli_args.is_present("disable-backfill-rate-limiting"); !cli_args.is_present("disable-backfill-rate-limiting");
if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")? if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")?
@@ -806,6 +806,20 @@ pub fn get_config<E: EthSpec>(
client_config.chain.progressive_balances_mode = progressive_balances_mode; client_config.chain.progressive_balances_mode = progressive_balances_mode;
} }
if let Some(max_workers) = clap_utils::parse_optional(cli_args, "beacon-processor-max-workers")?
{
client_config.beacon_processor.max_workers = max_workers;
}
if client_config.beacon_processor.max_workers == 0 {
return Err("--beacon-processor-max-workers must be a non-zero value".to_string());
}
client_config.beacon_processor.max_work_event_queue_len =
clap_utils::parse_required(cli_args, "beacon-processor-work-queue")?;
client_config.beacon_processor.max_scheduled_work_queue_len =
clap_utils::parse_required(cli_args, "beacon-processor-reprocess-queue")?;
Ok(client_config) Ok(client_config)
} }

View File

@@ -64,6 +64,7 @@ slashing_protection = { path = "../validator_client/slashing_protection" }
lighthouse_network = { path = "../beacon_node/lighthouse_network" } lighthouse_network = { path = "../beacon_node/lighthouse_network" }
sensitive_url = { path = "../common/sensitive_url" } sensitive_url = { path = "../common/sensitive_url" }
eth1 = { path = "../beacon_node/eth1" } eth1 = { path = "../beacon_node/eth1" }
beacon_processor = { path = "../beacon_node/beacon_processor" }
[[test]] [[test]]
name = "lighthouse_tests" name = "lighthouse_tests"

View File

@@ -5,6 +5,7 @@ use beacon_node::beacon_chain::chain_config::{
DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR,
DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD, DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD,
}; };
use beacon_processor::BeaconProcessorConfig;
use eth1::Eth1Endpoint; use eth1::Eth1Endpoint;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use std::fs::File; use std::fs::File;
@@ -1118,13 +1119,13 @@ fn disable_backfill_rate_limiting_flag() {
CommandLineTest::new() CommandLineTest::new()
.flag("disable-backfill-rate-limiting", None) .flag("disable-backfill-rate-limiting", None)
.run_with_zero_port() .run_with_zero_port()
.with_config(|config| assert!(!config.chain.enable_backfill_rate_limiting)); .with_config(|config| assert!(!config.beacon_processor.enable_backfill_rate_limiting));
} }
#[test] #[test]
fn default_backfill_rate_limiting_flag() { fn default_backfill_rate_limiting_flag() {
CommandLineTest::new() CommandLineTest::new()
.run_with_zero_port() .run_with_zero_port()
.with_config(|config| assert!(config.chain.enable_backfill_rate_limiting)); .with_config(|config| assert!(config.beacon_processor.enable_backfill_rate_limiting));
} }
#[test] #[test]
fn default_boot_nodes() { fn default_boot_nodes() {
@@ -2277,3 +2278,36 @@ fn progressive_balances_fast() {
) )
}); });
} }
#[test]
fn beacon_processor() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.beacon_processor, <_>::default()));
CommandLineTest::new()
.flag("beacon-processor-max-workers", Some("1"))
.flag("beacon-processor-work-queue", Some("2"))
.flag("beacon-processor-reprocess-queue", Some("3"))
.flag("disable-backfill-rate-limiting", None)
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.beacon_processor,
BeaconProcessorConfig {
max_workers: 1,
max_work_event_queue_len: 2,
max_scheduled_work_queue_len: 3,
enable_backfill_rate_limiting: false
}
)
});
}
#[test]
#[should_panic]
fn beacon_processor_zero_workers() {
CommandLineTest::new()
.flag("beacon-processor-max-workers", Some("0"))
.run_with_zero_port();
}