use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::{ ChainFilter, EthV1Filter, NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter, TaskSpawnerFilter, }; use crate::version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response, unsupported_version_rejection, }; use crate::{sync_committees, utils}; use beacon_chain::observed_operations::ObservationOutcome; use beacon_chain::payload_attestation_verification::Error as PayloadAttestationError; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bytes::Bytes; use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; use ssz::{Decode, Encode}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error, info, warn}; use types::{ Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage, ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage, }; use warp::filters::BoxedFilter; use warp::{Filter, Reply}; use warp_utils::reject::convert_rejection; pub type BeaconPoolPathFilter = BoxedFilter<( TaskSpawner<::EthSpec>, Arc>, )>; pub type BeaconPoolPathV2Filter = BoxedFilter<( TaskSpawner<::EthSpec>, Arc>, )>; pub type BeaconPoolPathAnyFilter = BoxedFilter<( EndpointVersion, TaskSpawner<::EthSpec>, Arc>, )>; /// POST beacon/pool/bls_to_execution_changes pub fn post_beacon_pool_bls_to_execution_changes( network_tx_filter: &NetworkTxFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, address_changes: Vec, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { let mut failures = vec![]; for (index, address_change) in address_changes.into_iter().enumerate() { let validator_index = address_change.message.validator_index; match chain.verify_bls_to_execution_change_for_http_api(address_change) { Ok(ObservationOutcome::New(verified_address_change)) => { let validator_index = verified_address_change.as_inner().message.validator_index; let address = verified_address_change .as_inner() .message .to_execution_address; // New to P2P *and* op pool, gossip immediately if post-Capella. let received_pre_capella = if chain.current_slot_is_post_capella().unwrap_or(false) { ReceivedPreCapella::No } else { ReceivedPreCapella::Yes }; if matches!(received_pre_capella, ReceivedPreCapella::No) { utils::publish_pubsub_message( &network_tx, PubsubMessage::BlsToExecutionChange(Box::new( verified_address_change.as_inner().clone(), )), )?; } // Import to op pool (may return `false` if there's a race). let imported = chain.import_bls_to_execution_change( verified_address_change, received_pre_capella, ); info!( %validator_index, ?address, published = matches!(received_pre_capella, ReceivedPreCapella::No), imported, "Processed BLS to execution change" ); } Ok(ObservationOutcome::AlreadyKnown) => { debug!(%validator_index, "BLS to execution change already known"); } Err(e) => { warn!( validator_index, reason = ?e, source = "HTTP", "Invalid BLS to execution change" ); failures.push(Failure::new(index, format!("invalid: {e:?}"))); } } } if failures.is_empty() { Ok(()) } else { Err(warp_utils::reject::indexed_bad_request( "some BLS to execution changes failed to verify".into(), failures, )) } }) }, ) .boxed() } /// GET beacon/pool/bls_to_execution_changes pub fn get_beacon_pool_bls_to_execution_changes( beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); Ok(GenericResponse::from(address_changes)) }) }, ) .boxed() } /// POST beacon/pool/sync_committees pub fn post_beacon_pool_sync_committees( network_tx_filter: &NetworkTxFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("sync_committees")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, signatures: Vec, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { sync_committees::process_sync_committee_signatures( signatures, network_tx, &chain, )?; Ok(GenericResponse::from(())) }) }, ) .boxed() } /// GET beacon/pool/voluntary_exits pub fn get_beacon_pool_voluntary_exits( beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_voluntary_exits(); Ok(GenericResponse::from(attestations)) }) }, ) .boxed() } /// POST beacon/pool/voluntary_exits pub fn post_beacon_pool_voluntary_exits( network_tx_filter: &NetworkTxFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, exit: SignedVoluntaryExit, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_voluntary_exit_for_gossip(exit.clone()) .map_err(|e| { warp_utils::reject::object_invalid(format!( "gossip verification failed: {:?}", e )) })?; // Notify the validator monitor. chain .validator_monitor .read() .register_api_voluntary_exit(&exit.message); if let ObservationOutcome::New(exit) = outcome { utils::publish_pubsub_message( &network_tx, PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())), )?; chain.import_voluntary_exit(exit); } Ok(()) }) }, ) .boxed() } /// GET beacon/pool/proposer_slashings pub fn get_beacon_pool_proposer_slashings( beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_proposer_slashings(); Ok(GenericResponse::from(attestations)) }) }, ) .boxed() } /// POST beacon/pool/proposer_slashings pub fn post_beacon_pool_proposer_slashings( network_tx_filter: &NetworkTxFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: ProposerSlashing, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_proposer_slashing_for_gossip(slashing.clone()) .map_err(|e| { warp_utils::reject::object_invalid(format!( "gossip verification failed: {:?}", e )) })?; // Notify the validator monitor. chain .validator_monitor .read() .register_api_proposer_slashing(&slashing); if let ObservationOutcome::New(slashing) = outcome { utils::publish_pubsub_message( &network_tx, PubsubMessage::ProposerSlashing(Box::new( slashing.clone().into_inner(), )), )?; chain.import_proposer_slashing(slashing); } Ok(()) }) }, ) .boxed() } /// GET beacon/pool/attester_slashings pub fn get_beacon_pool_attester_slashings( beacon_pool_path_any: &BeaconPoolPathAnyFilter, ) -> ResponseFilter { beacon_pool_path_any .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) .then( |endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_response_task(Priority::P1, move || { let slashings = chain.op_pool.get_all_attester_slashings(); // Use the current slot to find the fork version, and convert all messages to the // current fork's format. This is to ensure consistent message types matching // `Eth-Consensus-Version`. let current_slot = chain .slot_clock .now() .ok_or(warp_utils::reject::custom_server_error( "unable to read slot clock".to_string(), ))?; let fork_name = chain.spec.fork_name_at_slot::(current_slot); let slashings = slashings .into_iter() .filter(|slashing| { (fork_name.electra_enabled() && matches!(slashing, AttesterSlashing::Electra(_))) || (!fork_name.electra_enabled() && matches!(slashing, AttesterSlashing::Base(_))) }) .collect::>(); let require_version = match endpoint_version { V1 => ResponseIncludesVersion::No, V2 => ResponseIncludesVersion::Yes(fork_name), _ => return Err(unsupported_version_rejection(endpoint_version)), }; let res = beacon_response(require_version, &slashings); Ok(add_consensus_version_header( warp::reply::json(&res).into_response(), fork_name, )) }) }, ) .boxed() } // POST beacon/pool/attester_slashings pub fn post_beacon_pool_attester_slashings( network_tx_filter: &NetworkTxFilter, beacon_pool_path_any: &BeaconPoolPathAnyFilter, ) -> ResponseFilter { beacon_pool_path_any .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( // V1 and V2 are identical except V2 has a consensus version header in the request. // We only require this header for SSZ deserialization, which isn't supported for // this endpoint presently. |_endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_attester_slashing_for_gossip(slashing.clone()) .map_err(|e| { warp_utils::reject::object_invalid(format!( "gossip verification failed: {:?}", e )) })?; // Notify the validator monitor. chain .validator_monitor .read() .register_api_attester_slashing(slashing.to_ref()); if let ObservationOutcome::New(slashing) = outcome { utils::publish_pubsub_message( &network_tx, PubsubMessage::AttesterSlashing(Box::new( slashing.clone().into_inner(), )), )?; chain.import_attester_slashing(slashing); } Ok(()) }) }, ) .boxed() } /// GET beacon/pool/attestations?committee_index,slot pub fn get_beacon_pool_attestations( beacon_pool_path_any: &BeaconPoolPathAnyFilter, ) -> ResponseFilter { beacon_pool_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) .then( |endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>, query: AttestationPoolQuery| { task_spawner.blocking_response_task(Priority::P1, move || { let query_filter = |data: &AttestationData, committee_indices: HashSet| { query.slot.is_none_or(|slot| slot == data.slot) && query .committee_index .is_none_or(|index| committee_indices.contains(&index)) }; let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); attestations.extend( chain .naive_aggregation_pool .read() .iter() .filter(|&att| { query_filter(att.data(), att.get_committee_indices_map()) }) .cloned(), ); // Use the current slot to find the fork version, and convert all messages to the // current fork's format. This is to ensure consistent message types matching // `Eth-Consensus-Version`. let current_slot = chain .slot_clock .now() .ok_or(warp_utils::reject::custom_server_error( "unable to read slot clock".to_string(), ))?; let fork_name = chain.spec.fork_name_at_slot::(current_slot); let attestations = attestations .into_iter() .filter(|att| { (fork_name.electra_enabled() && matches!(att, Attestation::Electra(_))) || (!fork_name.electra_enabled() && matches!(att, Attestation::Base(_))) }) .collect::>(); let require_version = match endpoint_version { V1 => ResponseIncludesVersion::No, V2 => ResponseIncludesVersion::Yes(fork_name), _ => return Err(unsupported_version_rejection(endpoint_version)), }; let res = beacon_response(require_version, &attestations); Ok(add_consensus_version_header( warp::reply::json(&res).into_response(), fork_name, )) }) }, ) .boxed() } pub fn post_beacon_pool_attestations_v2( network_tx_filter: &NetworkTxFilter, optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, beacon_pool_path_v2: &BeaconPoolPathV2Filter, ) -> ResponseFilter { beacon_pool_path_v2 .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp_utils::json::json::>()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec, _fork_name: Option, network_tx: UnboundedSender>| async move { let result = crate::publish_attestations::publish_attestations( task_spawner, chain, attestations, network_tx, true, ) .await .map(|()| warp::reply::json(&())); convert_rejection(result).await }, ) .boxed() } /// POST beacon/pool/payload_attestations (JSON) pub fn post_beacon_pool_payload_attestations( network_tx_filter: &NetworkTxFilter, optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path .clone() .and(warp::path("payload_attestations")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, messages: Vec, _fork_name: Option, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { publish_payload_attestation_messages(&chain, &network_tx, messages) }) }, ) .boxed() } /// POST beacon/pool/payload_attestations (SSZ) pub fn post_beacon_pool_payload_attestations_ssz( eth_v1: EthV1Filter, task_spawner_filter: TaskSpawnerFilter, chain_filter: ChainFilter, network_tx_filter: NetworkTxFilter, ) -> ResponseFilter { eth_v1 .and(warp::path("beacon")) .and(warp::path("pool")) .and(warp::path("payload_attestations")) .and(warp::path::end()) .and(warp::body::bytes()) .and(task_spawner_filter) .and(chain_filter) .and(network_tx_filter) .then( |body_bytes: Bytes, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { let item_len = ::ssz_fixed_len(); if !body_bytes.len().is_multiple_of(item_len) { return Err(warp_utils::reject::custom_bad_request(format!( "SSZ body length {} is not a multiple of PayloadAttestationMessage size {}", body_bytes.len(), item_len, ))); } let messages: Vec = body_bytes .chunks(item_len) .map(|chunk| { PayloadAttestationMessage::from_ssz_bytes(chunk).map_err(|e| { warp_utils::reject::custom_bad_request(format!( "invalid SSZ: {e:?}" )) }) }) .collect::>()?; publish_payload_attestation_messages(&chain, &network_tx, messages) }) }, ) .boxed() } fn publish_payload_attestation_messages( chain: &BeaconChain, network_tx: &UnboundedSender>, messages: Vec, ) -> Result<(), warp::Rejection> { let mut failures = vec![]; let mut num_already_known = 0; for (index, message) in messages.into_iter().enumerate() { match chain.verify_payload_attestation_message_for_gossip(message.clone()) { Ok(verified) => { utils::publish_pubsub_message( network_tx, PubsubMessage::PayloadAttestation(Box::new(message)), )?; if let Err(e) = chain.apply_payload_attestation_to_fork_choice( verified.indexed_payload_attestation(), verified.ptc(), ) { warn!( error = ?e, request_index = index, "Payload attestation invalid for fork choice" ); } if let Err(e) = chain.add_payload_attestation_to_pool(&verified) { warn!( reason = ?e, "Failed to add payload attestation to pool" ); } } Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) => { num_already_known += 1; } // TODO(gloas): requeue for reprocessing like attestations do. Err(e) => { error!( error = ?e, request_index = index, "Failure verifying payload attestation for gossip" ); failures.push(Failure::new(index, format!("{e:?}"))); } } } if num_already_known > 0 { debug!( count = num_already_known, "Some payload attestations already known" ); } if failures.is_empty() { Ok(()) } else { Err(warp_utils::reject::indexed_bad_request( "error processing payload attestations".to_string(), failures, )) } }