From 88afbb39bec730388c61d8b080ebebc4c31a3657 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 2 Feb 2025 12:37:21 +0300 Subject: [PATCH] POST inclusion list endpoint --- beacon_node/beacon_chain/src/beacon_chain.rs | 22 ++ .../src/inclusion_list_verification.rs | 1 + beacon_node/beacon_chain/src/metrics.rs | 24 ++ .../beacon_chain/src/validator_monitor.rs | 18 +- beacon_node/http_api/src/lib.rs | 33 +-- .../http_api/src/publish_inclusion_lists.rs | 217 ++++++++++++++++++ .../gossip_methods.rs | 3 +- 7 files changed, 302 insertions(+), 16 deletions(-) create mode 100644 beacon_node/http_api/src/publish_inclusion_lists.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9f3f06ab8c..ebfe0f01c3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -34,6 +34,8 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::graffiti_calculator::GraffitiCalculator; use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker}; +use crate::inclusion_list_verification::GossipInclusionListError; +use crate::inclusion_list_verification::GossipVerifiedInclusionList; use crate::kzg_utils::reconstruct_blobs; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, @@ -2371,6 +2373,26 @@ impl BeaconChain { }) } + /// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if + /// it is valid to be (re)broadcast on the gossip network. + /// + /// The attestation must be "unaggregated", that is it must have exactly one + /// aggregation bit set. + pub fn verify_inclusion_list_for_gossip( + &self, + inclusion_list: &SignedInclusionList, + ) -> Result, GossipInclusionListError> { + metrics::inc_counter(&metrics::INCLUSION_LIST_PROCESSING_REQUESTS); + let _timer = + metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + GossipVerifiedInclusionList::verify(inclusion_list, self).inspect(|_v| { + // TODO(focil) emit event + if let Some(_event_handler) = self.event_handler.as_ref() {} + metrics::inc_counter(&metrics::INCLUSION_LIST_PROCESSING_SUCCESSES); + }) + } + /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// diff --git a/beacon_node/beacon_chain/src/inclusion_list_verification.rs b/beacon_node/beacon_chain/src/inclusion_list_verification.rs index 584996a847..1edb981d16 100644 --- a/beacon_node/beacon_chain/src/inclusion_list_verification.rs +++ b/beacon_node/beacon_chain/src/inclusion_list_verification.rs @@ -19,6 +19,7 @@ pub enum GossipInclusionListError { TooManyTransactions, InvalidSignature, BeaconChainError(BeaconChainError), + PriorInclusionListKnown, // TODO: equivocation e.g. PriorInclusionListKnown } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ae3add7f03..6071eeae57 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -509,6 +509,30 @@ pub static ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_inclusion_list_processing_requests_total", + "Count of all inclusion lists submitted for processing", + ) +}); +pub static INCLUSION_LIST_GOSSIP_VERIFICATION_TIMES: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "beacon_inclusion_list_gossip_verification_seconds", + "Full runtime of inclusion list gossip verification", + ) + }); +pub static INCLUSION_LIST_PROCESSING_SUCCESSES: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_inclusion_list_processing_successes_total", + "Number of inclusion lists verified for gossip", + ) + }); + /* * Fork Choice */ diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index f8a483c621..304c0bc5cc 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -28,7 +28,7 @@ use types::{ Attestation, AttestationData, AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, - SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, + SignedContributionAndProof, SignedInclusionList, Slot, SyncCommitteeMessage, VoluntaryExit, }; /// Used for Prometheus labels. @@ -1887,6 +1887,22 @@ impl ValidatorMonitor { }) } + /// Register an inclusion list seen on the HTTP API. + pub fn register_api_inclusion_list( + &self, + _seen_timestamp: Duration, + _inclusion_list: &SignedInclusionList, + _slot_clock: &S, + ) { + // TODO(focil) add logic to register IL + // self.register_unaggregated_attestation( + // "api", + // seen_timestamp, + // indexed_attestation, + // slot_clock, + // ) + } + /// Scrape `self` for metrics. /// /// Should be called whenever Prometheus is scraping Lighthouse. diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index dfe65db92e..1fc5fe7399 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -20,6 +20,7 @@ mod produce_block; mod proposer_duties; mod publish_attestations; mod publish_blocks; +mod publish_inclusion_lists; mod standard_block_rewards; mod state_id; mod sync_committee_rewards; @@ -2318,30 +2319,33 @@ pub fn serve( ); // POST beacon/pool/inclusion_lists - // TODO(focil) unused endpoint and variables - let _post_beacon_pool_inclusion_lists = beacon_pool_path + let post_beacon_pool_inclusion_lists = beacon_pool_path .clone() .and(warp::path("inclusion_lists")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) + .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( |task_spawner: TaskSpawner, - _chain: Arc>, + chain: Arc>, inclusion_lists: Vec>, - _network_tx: UnboundedSender>, - log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { - // TODO(focil): actually gossip the inclusion lists - info!( - log, - "Posting signed inclusion lists for gossip"; - "num_inclusion_lists" => inclusion_lists.len(), - ); + network_tx: UnboundedSender>, + reprocess_tx: Option>, + log: Logger| async move { + let result = crate::publish_inclusion_lists::publish_inclusion_lists( + task_spawner, + chain, + inclusion_lists, + network_tx, + reprocess_tx, + log, + ) + .await + .map(|()| warp::reply::json(&())); - Ok(()) - }) + convert_rejection(result).await }, ); @@ -4913,6 +4917,7 @@ pub fn serve( .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) + .uor(post_beacon_pool_inclusion_lists) .recover(warp_utils::reject::handle_rejection), ), ) diff --git a/beacon_node/http_api/src/publish_inclusion_lists.rs b/beacon_node/http_api/src/publish_inclusion_lists.rs new file mode 100644 index 0000000000..5437b1f4de --- /dev/null +++ b/beacon_node/http_api/src/publish_inclusion_lists.rs @@ -0,0 +1,217 @@ +use std::{sync::Arc, time::Duration}; + +use beacon_chain::inclusion_list_verification::GossipInclusionListError; +use beacon_chain::{validator_monitor::timestamp_now, BeaconChain, BeaconChainTypes}; +use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use eth2::types::Failure; +use lighthouse_network::PubsubMessage; +use network::NetworkMessage; +use slog::{debug, error, info, Logger}; +use tokio::sync::{ + mpsc::{Sender, UnboundedSender}, + oneshot, +}; +use types::SignedInclusionList; + +use crate::task_spawner::{Priority, TaskSpawner}; + +enum PublishInclusionListResult { + Success, + #[allow(dead_code)] + Reprocessing(oneshot::Receiver>), + Failure(Error), + AlreadyKnown, +} + +#[derive(Debug)] +pub enum Error { + Validation(GossipInclusionListError), + Publication, + ReprocessTimeout, +} + +pub async fn publish_inclusion_lists( + task_spawner: TaskSpawner, + chain: Arc>, + inclusion_lists: Vec>, + network_tx: UnboundedSender>, + _reprocess_send: Option>, + log: Logger, +) -> Result<(), warp::Rejection> { + // Gossip validate and publish inclusion lists that can be immediately processed. + let seen_timestamp = timestamp_now(); + let inner_log = log.clone(); + + let inclusion_list_metadata = inclusion_lists + .iter() + .map(|inclusion_list| { + ( + inclusion_list.message.slot, + inclusion_list.message.validator_index, + ) + }) + .collect::>(); + + let mut prelim_results = task_spawner + .blocking_task(Priority::P0, move || { + Ok(inclusion_lists + .into_iter() + .map(move |inclusion_list| { + match verify_and_publish_inclusion_list( + &chain, + &inclusion_list, + seen_timestamp, + &network_tx, + &inner_log, + ) { + Ok(()) => PublishInclusionListResult::Success, + Err(e) => PublishInclusionListResult::Failure(e), + } + }) + .map(Some) + .collect::>()) + }) + .await?; + + let (reprocess_indices, reprocess_futures): (Vec<_>, Vec<_>) = prelim_results + .iter_mut() + .enumerate() + .filter_map(|(i, opt_result)| { + if let Some(PublishInclusionListResult::Reprocessing(..)) = &opt_result { + let PublishInclusionListResult::Reprocessing(rx) = opt_result.take()? else { + // Unreachable. + return None; + }; + Some((i, rx)) + } else { + None + } + }) + .unzip(); + + let reprocess_results = futures::future::join_all(reprocess_futures).await; + + // Join everything back together and construct a response. + // This part should be quick so we just stay in the Tokio executor's async task. + for (i, reprocess_result) in reprocess_indices.into_iter().zip(reprocess_results) { + let Some(result_entry) = prelim_results.get_mut(i) else { + error!( + log, + "Unreachable case in inclusion list publishing"; + "case" => "prelim out of bounds", + "request_index" => i, + ); + continue; + }; + *result_entry = Some(match reprocess_result { + Ok(Ok(())) => PublishInclusionListResult::Success, + // Inclusion list already known + Ok(Err(Error::Validation(GossipInclusionListError::PriorInclusionListKnown))) => { + PublishInclusionListResult::AlreadyKnown + } + Ok(Err(e)) => PublishInclusionListResult::Failure(e), + // Oneshot was dropped, indicating that the inclusion list either timed out in the + // reprocess queue or was dropped due to some error. + Err(_) => PublishInclusionListResult::Failure(Error::ReprocessTimeout), + }); + } + + // Construct the response. + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, result) in prelim_results.iter().enumerate() { + match result { + Some(PublishInclusionListResult::Success) => {} + Some(PublishInclusionListResult::AlreadyKnown) => num_already_known += 1, + Some(PublishInclusionListResult::Failure(e)) => { + if let Some((slot, validator_index)) = inclusion_list_metadata.get(index) { + error!( + log, + "Failure verifying attestation for gossip"; + "error" => ?e, + "request_index" => index, + "validator_index" => validator_index, + "inclusion_list_slot" => slot, + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } else { + error!( + log, + "Unreachable case in inclusion list publishing"; + "case" => "out of bounds", + "request_index" => index + ); + failures.push(Failure::new(index, "metadata logic error".into())); + } + } + Some(PublishInclusionListResult::Reprocessing(_)) => { + // TODO(focil) reprocessing + info!( + log, + "Reprocessing result"; + ); + } + None => { + error!( + log, + "Unreachable case in inclusion list publishing"; + "case" => "result is None", + "request_index" => index + ); + failures.push(Failure::new(index, "result logic error".into())); + } + } + } + + if num_already_known > 0 { + debug!( + log, + "Some inclusion lists already known"; + "count" => num_already_known + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing inclusion list".to_string(), + failures, + )) + } +} + +fn verify_and_publish_inclusion_list( + chain: &Arc>, + inclusion_list: &SignedInclusionList, + seen_timestamp: Duration, + network_tx: &UnboundedSender>, + _log: &Logger, +) -> Result<(), Error> { + let verified_inclusion_list = chain + .verify_inclusion_list_for_gossip(inclusion_list) + .map_err(Error::Validation)?; + + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::InclusionList(Box::new( + verified_inclusion_list.signed_il.clone(), + ))], + }) + .map_err(|_| Error::Publication)?; + + // TODO(focil) add reprocess logic? + + // Notify the validator monitor. + chain.validator_monitor.read().register_api_inclusion_list( + seen_timestamp, + &verified_inclusion_list.signed_il, + &chain.slot_clock, + ); + + // Store verified IL in the IL cache + chain.on_verified_inclusion_list(verified_inclusion_list.signed_il); + + Ok(()) +} diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index bab0b7a07e..be64c8684f 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2185,7 +2185,8 @@ impl NetworkBeaconProcessor { | GossipInclusionListError::PastSlot { .. } | GossipInclusionListError::ValidatorNotInCommittee | GossipInclusionListError::TooManyTransactions - | GossipInclusionListError::InvalidSignature => { + | GossipInclusionListError::InvalidSignature + | GossipInclusionListError::PriorInclusionListKnown => { debug!(self.log, "Could not verify inclusion list for gossip. Rejecting the inclusion list"; "error" => ?err); } GossipInclusionListError::InvalidCommitteeRoot => {