diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index fcdd57abbc..b262fe6ada 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -94,6 +94,7 @@ pub struct ChainConfig { /// The delay in milliseconds applied by the node between sending each blob or data column batch. /// This doesn't apply if the node is the block proposer. pub blob_publication_batch_interval: Duration, + pub disable_attesting: bool, } impl Default for ChainConfig { @@ -129,6 +130,7 @@ impl Default for ChainConfig { enable_sampling: false, blob_publication_batches: 4, blob_publication_batch_interval: Duration::from_millis(300), + disable_attesting: false, } } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5d75dc8c9a..ff2474531d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1859,6 +1859,13 @@ pub fn serve( network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request("Attesting disabled".to_string()), + )) + .await; + } + let attestations = attestations.into_iter().map(Either::Left).collect(); let result = crate::publish_attestations::publish_attestations( task_spawner, @@ -1891,6 +1898,12 @@ pub fn serve( network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request("Attesting disabled".to_string()), + )) + .await; + } let attestations = match crate::publish_attestations::deserialize_attestation_payload::( payload, fork_name, &log, @@ -1937,49 +1950,66 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { - task_spawner.blocking_response_task(Priority::P1, move || { - let query_filter = |data: &AttestationData| { - query.slot.is_none_or(|slot| slot == data.slot) - && query - .committee_index - .is_none_or(|index| index == data.index) - }; + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_response_task(Priority::P1, move || { + let query_filter = |data: &AttestationData| { + query.slot.is_none_or(|slot| slot == data.slot) + && query + .committee_index + .is_none_or(|index| index == data.index) + }; - let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); - attestations.extend( - chain - .naive_aggregation_pool - .read() - .iter() - .filter(|&att| query_filter(att.data())) - .cloned(), - ); - // Use the current slot to find the fork version, and convert all messages to the - // current fork's format. This is to ensure consistent message types matching - // `Eth-Consensus-Version`. - let current_slot = - chain - .slot_clock - .now() - .ok_or(warp_utils::reject::custom_server_error( - "unable to read slot clock".to_string(), - ))?; - let fork_name = chain.spec.fork_name_at_slot::(current_slot); - let attestations = attestations - .into_iter() - .filter(|att| { - (fork_name.electra_enabled() && matches!(att, Attestation::Electra(_))) - || (!fork_name.electra_enabled() - && matches!(att, Attestation::Base(_))) + let mut attestations = + chain.op_pool.get_filtered_attestations(query_filter); + attestations.extend( + chain + .naive_aggregation_pool + .read() + .iter() + .filter(|&att| query_filter(att.data())) + .cloned(), + ); + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = chain.slot_clock.now().ok_or( + warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ), + )?; + let fork_name = + chain.spec.fork_name_at_slot::(current_slot); + let attestations = attestations + .into_iter() + .filter(|att| { + (fork_name.electra_enabled() + && matches!(att, Attestation::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(att, Attestation::Base(_))) + }) + .collect::>(); + + let res = fork_versioned_response( + endpoint_version, + fork_name, + &attestations, + )?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }) - .collect::>(); - - let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?; - Ok(add_consensus_version_header( - warp::reply::json(&res).into_response(), - fork_name, - )) - }) + .await + } }, ); @@ -2200,12 +2230,24 @@ pub fn serve( signatures: Vec, network_tx: UnboundedSender>, log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { - sync_committees::process_sync_committee_signatures( - signatures, network_tx, &chain, log, - )?; - Ok(api_types::GenericResponse::from(())) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + sync_committees::process_sync_committee_signatures( + signatures, network_tx, &chain, log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + .await + } }, ); @@ -3419,10 +3461,22 @@ pub fn serve( indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - attester_duties::attester_duties(epoch, &indices.0, &chain) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + attester_duties::attester_duties(epoch, &indices.0, &chain) + }) + .await + } }, ); @@ -3447,10 +3501,22 @@ pub fn serve( indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::sync_committee_duties(epoch, &indices.0, &chain) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::sync_committee_duties(epoch, &indices.0, &chain) + }) + .await + } }, ); @@ -3468,23 +3534,35 @@ pub fn serve( not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - chain - .get_aggregated_sync_committee_contribution(&sync_committee_data) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "unable to fetch sync contribution: {:?}", - e - )) - })? - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching sync contribution found".to_string(), - ) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + chain + .get_aggregated_sync_committee_contribution(&sync_committee_data) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "unable to fetch sync contribution: {:?}", + e + )) + })? + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching sync contribution found".to_string(), + ) + }) }) - }) + .await + } }, ); @@ -3509,6 +3587,15 @@ pub fn serve( chain: Arc>, aggregates: Vec>, network_tx: UnboundedSender>, log: Logger| { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } task_spawner.blocking_json_task(Priority::P0, move || { not_synced_filter?; let seen_timestamp = timestamp_now(); @@ -3604,7 +3691,8 @@ pub fn serve( } else { Ok(()) } - }) + }).await + } }, ); @@ -3625,36 +3713,58 @@ pub fn serve( contributions: Vec>, network_tx: UnboundedSender>, log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::process_signed_contribution_and_proofs( - contributions, - network_tx, - &chain, - log, - )?; - Ok(api_types::GenericResponse::from(())) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::process_signed_contribution_and_proofs( + contributions, + network_tx, + &chain, + log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + .await + } }, ); // POST validator/beacon_committee_subscriptions - let post_validator_beacon_committee_subscriptions = eth_v1 - .and(warp::path("validator")) - .and(warp::path("beacon_committee_subscriptions")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(validator_subscription_tx_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(log_filter.clone()) - .then( - |subscriptions: Vec, - validator_subscription_tx: Sender, - task_spawner: TaskSpawner, - chain: Arc>, - log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { + let post_validator_beacon_committee_subscriptions = + eth_v1 + .and(warp::path("validator")) + .and(warp::path("beacon_committee_subscriptions")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(validator_subscription_tx_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(log_filter.clone()) + .then( + |subscriptions: Vec, + validator_subscription_tx: Sender, + task_spawner: TaskSpawner, + chain: Arc>, + log: Logger| { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner.blocking_json_task(Priority::P0, move || { let subscriptions: std::collections::BTreeSet<_> = subscriptions .iter() .map(|subscription| { @@ -3686,9 +3796,10 @@ pub fn serve( } Ok(()) - }) - }, - ); + }).await + } + }, + ); // POST validator/prepare_beacon_proposer let post_validator_prepare_beacon_proposer = eth_v1 @@ -3945,6 +4056,15 @@ pub fn serve( chain: Arc>, log: Logger | { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } task_spawner.blocking_json_task(Priority::P0, move || { for subscription in subscriptions { chain @@ -3969,7 +4089,8 @@ pub fn serve( } Ok(()) - }) + }).await + } }, ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2c8b271bd2..00fa726757 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -553,6 +553,14 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("disable-attesting") + .long("disable-attesting") + .help("Turn off attestation related APIs so that we have some hope of producing \ + blocks") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("http-sse-capacity-multiplier") .long("http-sse-capacity-multiplier") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 84320762d6..e07cf0193f 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -191,6 +191,10 @@ pub fn get_config( client_config.chain.enable_light_client_server = false; } + if cli_args.get_flag("disable-attesting") { + client_config.chain.disable_attesting = true; + } + if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index da10c2c4bd..984fb45584 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2579,6 +2579,16 @@ fn light_client_http_server_disabled() { }); } +#[test] +fn disable_attesting() { + CommandLineTest::new() + .flag("disable-attesting", None) + .run_with_zero_port() + .with_config(|config| { + assert!(config.chain.disable_attesting); + }); +} + #[test] fn gui_flag() { CommandLineTest::new()