//! Import attestations and publish them to the network. //! //! This module gracefully handles attestations to unknown blocks by requeuing them and then //! efficiently waiting for them to finish reprocessing (using an async yield). //! //! The following comments relate to the handling of duplicate attestations (relocated here during //! refactoring): //! //! Skip to the next attestation since an attestation for this //! validator is already known in this epoch. //! //! There's little value for the network in validating a second //! attestation for another validator since it is either: //! //! 1. A duplicate. //! 2. Slashable. //! 3. Invalid. //! //! We are likely to get duplicates in the case where a VC is using //! fallback BNs. If the first BN actually publishes some/all of a //! batch of attestations but fails to respond in a timely fashion, //! the VC is likely to try publishing the attestations on another //! BN. That second BN may have already seen the attestations from //! the first BN and therefore indicate that the attestations are //! "already seen". An attestation that has already been seen has //! been published on the network so there's no actual error from //! the perspective of the user. //! //! It's better to prevent slashable attestations from ever //! appearing on the network than trying to slash validators, //! especially those validators connected to the local API. //! //! There might be *some* value in determining that this attestation //! is invalid, but since a valid attestation already it exists it //! appears that this validator is capable of producing valid //! attestations and there's no immediate cause for concern. use crate::task_spawner::{Priority, TaskSpawner}; use beacon_chain::{ AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes, validator_monitor::timestamp_now, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; use beacon_processor::{Work, WorkEvent}; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::{debug, error, warn}; use types::SingleAttestation; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] pub enum Error { Validation(AttestationError), Publication, ForkChoice(#[allow(dead_code)] Box), AggregationPool(#[allow(dead_code)] AttestationError), ReprocessDisabled, ReprocessFull, ReprocessTimeout, } enum PublishAttestationResult { Success, AlreadyKnown, Reprocessing(oneshot::Receiver>), Failure(Error), } fn verify_and_publish_attestation( chain: &Arc>, attestation: &SingleAttestation, seen_timestamp: Duration, network_tx: &UnboundedSender>, ) -> Result<(), Error> { let verified_attestation = chain .verify_unaggregated_attestation_for_gossip(attestation, None) .map_err(Error::Validation)?; network_tx .send(NetworkMessage::Publish { messages: vec![PubsubMessage::Attestation(Box::new(( verified_attestation.subnet_id(), attestation.clone(), )))], }) .map_err(|_| Error::Publication)?; // Notify the validator monitor. chain .validator_monitor .read() .register_api_unaggregated_attestation( seen_timestamp, verified_attestation.indexed_attestation(), &chain.slot_clock, ); let fc_result = chain.apply_attestation_to_fork_choice(&verified_attestation); let naive_aggregation_result = chain.add_to_naive_aggregation_pool(&verified_attestation); if let Err(e) = &fc_result { warn!( err = ?e, "Attestation invalid for fork choice" ); } if let Err(e) = &naive_aggregation_result { warn!( err = ?e, "Attestation invalid for aggregation" ); } if let Err(e) = fc_result { Err(Error::ForkChoice(Box::new(e))) } else if let Err(e) = naive_aggregation_result { Err(Error::AggregationPool(e)) } else { Ok(()) } } pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, attestations: Vec, network_tx: UnboundedSender>, allow_reprocess: bool, ) -> Result<(), warp::Rejection> { // Collect metadata about attestations which we'll use to report failures. We need to // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. let attestation_metadata = attestations .iter() .map(|att| (att.data.slot, Some(att.committee_index))) .collect::>(); // Gossip validate and publish attestations that can be immediately processed. let seen_timestamp = timestamp_now(); let mut prelim_results = task_spawner .clone() .blocking_task(Priority::P0, move || { Ok(attestations .into_iter() .map(|attestation| { match verify_and_publish_attestation( &chain, &attestation, seen_timestamp, &network_tx, ) { Ok(()) => PublishAttestationResult::Success, Err(Error::Validation(AttestationError::UnknownHeadBlock { beacon_block_root, })) => { if !allow_reprocess { return PublishAttestationResult::Failure(Error::ReprocessDisabled); }; // Re-process. let (tx, rx) = oneshot::channel(); let reprocess_chain = chain.clone(); let reprocess_network_tx = network_tx.clone(); let reprocess_fn = move || { let result = verify_and_publish_attestation( &reprocess_chain, &attestation, seen_timestamp, &reprocess_network_tx, ); // Ignore failure on the oneshot that reports the result. This // shouldn't happen unless some catastrophe befalls the waiting // thread which causes it to drop. let _ = tx.send(result); }; let reprocess_msg = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { beacon_block_root, process_fn: Box::new(reprocess_fn), }); if task_spawner .try_send(WorkEvent { drop_during_sync: false, work: Work::Reprocess(reprocess_msg), }) .is_err() { PublishAttestationResult::Failure(Error::ReprocessFull) } else { PublishAttestationResult::Reprocessing(rx) } } Err(Error::Validation(AttestationError::PriorAttestationKnown { .. })) => PublishAttestationResult::AlreadyKnown, Err(e) => PublishAttestationResult::Failure(e), } }) .map(Some) .collect::>()) }) .await?; // Asynchronously wait for re-processing of attestations to unknown blocks. This avoids blocking // any of the beacon processor workers while we wait for reprocessing. let (reprocess_indices, reprocess_futures): (Vec<_>, Vec<_>) = prelim_results .iter_mut() .enumerate() .filter_map(|(i, opt_result)| { if let Some(PublishAttestationResult::Reprocessing(..)) = &opt_result { let PublishAttestationResult::Reprocessing(rx) = opt_result.take()? else { // Unreachable. return None; }; Some((i, rx)) } else { None } }) .unzip(); let reprocess_results = futures::future::join_all(reprocess_futures).await; // Join everything back together and construct a response. // This part should be quick so we just stay in the Tokio executor's async task. for (i, reprocess_result) in reprocess_indices.into_iter().zip(reprocess_results) { let Some(result_entry) = prelim_results.get_mut(i) else { error!( case = "prelim out of bounds", request_index = i, "Unreachable case in attestation publishing" ); continue; }; *result_entry = Some(match reprocess_result { Ok(Ok(())) => PublishAttestationResult::Success, // Attestation failed processing on re-process. Ok(Err(Error::Validation(AttestationError::PriorAttestationKnown { .. }))) => { PublishAttestationResult::AlreadyKnown } Ok(Err(e)) => PublishAttestationResult::Failure(e), // Oneshot was dropped, indicating that the attestation either timed out in the // reprocess queue or was dropped due to some error. Err(_) => PublishAttestationResult::Failure(Error::ReprocessTimeout), }); } // Construct the response. let mut failures = vec![]; let mut num_already_known = 0; for (index, result) in prelim_results.iter().enumerate() { match result { Some(PublishAttestationResult::Success) => {} Some(PublishAttestationResult::AlreadyKnown) => num_already_known += 1, Some(PublishAttestationResult::Failure(e)) => { if let Some((slot, committee_index)) = attestation_metadata.get(index) { error!( error = ?e, request_index = index, committee_index, attestation_slot = %slot, "Failure verifying attestation for gossip" ); failures.push(Failure::new(index, format!("{e:?}"))); } else { error!( case = "out of bounds", request_index = index, "Unreachable case in attestation publishing" ); failures.push(Failure::new(index, "metadata logic error".into())); } } Some(PublishAttestationResult::Reprocessing(_)) => { error!( case = "reprocessing", request_index = index, "Unreachable case in attestation publishing" ); failures.push(Failure::new(index, "reprocess logic error".into())); } None => { error!( case = "result is None", request_index = index, "Unreachable case in attestation publishing" ); failures.push(Failure::new(index, "result logic error".into())); } } } if num_already_known > 0 { debug!( count = num_already_known, "Some unagg attestations already known" ); } if failures.is_empty() { Ok(()) } else { Err(warp_utils::reject::indexed_bad_request( "error processing attestations".to_string(), failures, )) } }