diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index ff8d34fdfd..a731c7cbb4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -599,39 +599,34 @@ pub fn serve( }, ); - // GET beacon/states/{state_id}/committees/{epoch} + // GET beacon/states/{state_id}/committees?slot,index,epoch let get_beacon_state_committees = beacon_states_path .clone() .and(warp::path("committees")) - .and(warp::path::param::()) .and(warp::query::()) .and(warp::path::end()) .and_then( - |state_id: StateId, - chain: Arc>, - epoch: Epoch, - query: api_types::CommitteesQuery| { + |state_id: StateId, chain: Arc>, query: api_types::CommitteesQuery| { + // the api spec says if the epoch is not present then the epoch of the state should be used + let query_state_id = query.epoch.map_or(state_id, |epoch| { + StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) + }); + blocking_json_task(move || { - state_id.map_state(&chain, |state| { - let relative_epoch = - RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err( - |_| { - warp_utils::reject::custom_bad_request(format!( - "state is epoch {} and only previous, current and next epochs are supported", - state.current_epoch() - )) - }, - )?; + query_state_id.map_state(&chain, |state| { + let epoch = state.slot.epoch(T::EthSpec::slots_per_epoch()); let committee_cache = if state - .committee_cache_is_initialized(relative_epoch) + .committee_cache_is_initialized(RelativeEpoch::Current) { - state.committee_cache(relative_epoch).map(Cow::Borrowed) + state + .committee_cache(RelativeEpoch::Current) + .map(Cow::Borrowed) } else { CommitteeCache::initialized(state, epoch, &chain.spec).map(Cow::Owned) } - .map_err(BeaconChainError::BeaconStateError) - .map_err(warp_utils::reject::beacon_chain_error)?; + .map_err(BeaconChainError::BeaconStateError) + .map_err(warp_utils::reject::beacon_chain_error)?; // Use either the supplied slot or all slots in the epoch. let slots = query.slot.map(|slot| vec![slot]).unwrap_or_else(|| { @@ -659,11 +654,11 @@ pub fn serve( let committee = committee_cache .get_beacon_committee(slot, index) .ok_or_else(|| { - warp_utils::reject::custom_bad_request(format!( - "committee index {} does not exist in epoch {}", - index, epoch - )) - })?; + warp_utils::reject::custom_bad_request(format!( + "committee index {} does not exist in epoch {}", + index, epoch + )) + })?; response.push(api_types::CommitteeData { index, @@ -906,63 +901,119 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) + .and(log_filter.clone()) .and_then( |chain: Arc>, - attestation: Attestation, - network_tx: UnboundedSender>| { + attestations: Vec>, + network_tx: UnboundedSender>, + log: Logger| { blocking_json_task(move || { - let attestation = chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), None) - .map_err(|e| { - warp_utils::reject::object_invalid(format!( - "gossip verification failed: {:?}", - e - )) - })?; + let mut failures = Vec::new(); - publish_pubsub_message( - &network_tx, - PubsubMessage::Attestation(Box::new(( - attestation.subnet_id(), - attestation.attestation().clone(), - ))), - )?; + for (index, attestation) in attestations.as_slice().iter().enumerate() { + let attestation = match chain + .verify_unaggregated_attestation_for_gossip(attestation.clone(), None) + { + Ok(attestation) => attestation, + Err(e) => { + error!(log, + "Failure verifying attestation for gossip"; + "error" => ?e, + "request_index" => index, + "committee_index" => attestation.data.index, + "attestation_slot" => attestation.data.slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Verification: {:?}", e), + )); + // skip to the next attestation so we do not publish this one to gossip + continue; + } + }; - chain - .apply_attestation_to_fork_choice(&attestation) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to fork choice: {:?}", - e - )) - })?; + publish_pubsub_message( + &network_tx, + PubsubMessage::Attestation(Box::new(( + attestation.subnet_id(), + attestation.attestation().clone(), + ))), + )?; - chain - .add_to_naive_aggregation_pool(attestation) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to naive aggregation pool: {:?}", - e - )) - })?; + let committee_index = attestation.attestation().data.index; + let slot = attestation.attestation().data.slot; - Ok(()) + if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) { + error!(log, + "Failure applying verified attestation to fork choice"; + "error" => ?e, + "request_index" => index, + "committee_index" => committee_index, + "slot" => slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Fork choice: {:?}", e), + )); + }; + + if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { + error!(log, + "Failure adding verified attestation to the naive aggregation pool"; + "error" => ?e, + "request_index" => index, + "committee_index" => committee_index, + "slot" => slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Naive aggregation pool: {:?}", e), + )); + } + } + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing attestations".to_string(), + failures, + )) + } }) }, ); - // GET beacon/pool/attestations + // GET beacon/pool/attestations?committee_index,slot let get_beacon_pool_attestations = beacon_pool_path .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let mut attestations = chain.op_pool.get_all_attestations(); - attestations.extend(chain.naive_aggregation_pool.read().iter().cloned()); - Ok(api_types::GenericResponse::from(attestations)) - }) - }); + .and(warp::query::()) + .and_then( + |chain: Arc>, query: api_types::AttestationPoolQuery| { + blocking_json_task(move || { + let query_filter = |attestation: &Attestation| { + query + .slot + .map_or(true, |slot| slot == attestation.data.slot) + && query + .committee_index + .map_or(true, |index| index == attestation.data.index) + }; + + let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); + attestations.extend( + chain + .naive_aggregation_pool + .read() + .iter() + .cloned() + .filter(query_filter), + ); + Ok(api_types::GenericResponse::from(attestations)) + }) + }, + ); // POST beacon/pool/attester_slashings let post_beacon_pool_attester_slashings = beacon_pool_path diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 875543bb7d..b257a20d59 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -6,6 +6,7 @@ use beacon_chain::{ }; use discv5::enr::{CombinedKey, EnrBuilder}; use environment::null_logger; +use eth2::Error; use eth2::{types::*, BeaconNodeHttpClient, Url}; use eth2_libp2p::{ rpc::methods::MetaData, @@ -624,14 +625,10 @@ impl ApiTester { for state_id in self.interesting_state_ids() { let mut state_opt = self.get_state(state_id); - let epoch = state_opt - .as_ref() - .map(|state| state.current_epoch()) - .unwrap_or_else(|| Epoch::new(0)); - + let epoch_opt = state_opt.as_ref().map(|state| state.current_epoch()); let results = self .client - .get_beacon_states_committees(state_id, epoch, None, None) + .get_beacon_states_committees(state_id, None, None, epoch_opt) .await .unwrap() .map(|res| res.data); @@ -641,11 +638,10 @@ impl ApiTester { } let state = state_opt.as_mut().expect("result should be none"); + state.build_all_committee_caches(&self.chain.spec).unwrap(); let committees = state - .get_beacon_committees_at_epoch( - RelativeEpoch::from_epoch(state.current_epoch(), epoch).unwrap(), - ) + .get_beacon_committees_at_epoch(RelativeEpoch::Current) .unwrap(); for (i, result) in results.unwrap().into_iter().enumerate() { @@ -886,45 +882,60 @@ impl ApiTester { } pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { - for attestation in &self.attestations { - self.client - .post_beacon_pool_attestations(attestation) - .await - .unwrap(); + self.client + .post_beacon_pool_attestations(self.attestations.as_slice()) + .await + .unwrap(); - assert!( - self.network_rx.try_recv().is_ok(), - "valid attestation should be sent to network" - ); - } + assert!( + self.network_rx.try_recv().is_ok(), + "valid attestation should be sent to network" + ); self } pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self { + let mut attestations = Vec::new(); for attestation in &self.attestations { - let mut attestation = attestation.clone(); - attestation.data.slot += 1; + let mut invalid_attestation = attestation.clone(); + invalid_attestation.data.slot += 1; - assert!(self - .client - .post_beacon_pool_attestations(&attestation) - .await - .is_err()); - - assert!( - self.network_rx.try_recv().is_err(), - "invalid attestation should not be sent to network" - ); + // add both to ensure we only fail on invalid attestations + attestations.push(attestation.clone()); + attestations.push(invalid_attestation); } + let err = self + .client + .post_beacon_pool_attestations(attestations.as_slice()) + .await + .unwrap_err(); + + match err { + Error::ServerIndexedMessage(IndexedErrorMessage { + code, + message: _, + failures, + }) => { + assert_eq!(code, 400); + assert_eq!(failures.len(), self.attestations.len()); + } + _ => panic!("query did not fail correctly"), + } + + assert!( + self.network_rx.try_recv().is_ok(), + "if some attestations are valid, we should send them to the network" + ); + self } pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client - .get_beacon_pool_attestations() + .get_beacon_pool_attestations(None, None) .await .unwrap() .data; diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 45c16cc2fc..c2047eba3f 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -344,6 +344,22 @@ impl OperationPool { .collect() } + /// Returns all known `Attestation` objects that pass the provided filter. + /// + /// This method may return objects that are invalid for block inclusion. + pub fn get_filtered_attestations(&self, filter: F) -> Vec> + where + F: Fn(&Attestation) -> bool, + { + self.attestations + .read() + .iter() + .map(|(_, attns)| attns.iter().cloned()) + .flatten() + .filter(filter) + .collect() + } + /// Returns all known `AttesterSlashing` objects. /// /// This method may return objects that are invalid for block inclusion. diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index be8fc0b59c..c6191b7fff 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -301,15 +301,15 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `GET beacon/states/{state_id}/committees?slot,index` + /// `GET beacon/states/{state_id}/committees?slot,index,epoch` /// /// Returns `Ok(None)` on a 404 error. pub async fn get_beacon_states_committees( &self, state_id: StateId, - epoch: Epoch, slot: Option, index: Option, + epoch: Option, ) -> Result>>, Error> { let mut path = self.eth_path()?; @@ -318,8 +318,7 @@ impl BeaconNodeHttpClient { .push("beacon") .push("states") .push(&state_id.to_string()) - .push("committees") - .push(&epoch.to_string()); + .push("committees"); if let Some(slot) = slot { path.query_pairs_mut() @@ -331,6 +330,11 @@ impl BeaconNodeHttpClient { .append_pair("index", &index.to_string()); } + if let Some(epoch) = epoch { + path.query_pairs_mut() + .append_pair("epoch", &epoch.to_string()); + } + self.get_opt(path).await } @@ -479,7 +483,7 @@ impl BeaconNodeHttpClient { /// `POST beacon/pool/attestations` pub async fn post_beacon_pool_attestations( &self, - attestation: &Attestation, + attestations: &[Attestation], ) -> Result<(), Error> { let mut path = self.eth_path()?; @@ -489,14 +493,23 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); - self.post(path, attestation).await?; + let response = self + .client + .post(path) + .json(attestations) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_indexed_error(response).await?; Ok(()) } - /// `GET beacon/pool/attestations` + /// `GET beacon/pool/attestations?slot,committee_index` pub async fn get_beacon_pool_attestations( &self, + slot: Option, + committee_index: Option, ) -> Result>>, Error> { let mut path = self.eth_path()?; @@ -506,6 +519,16 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); + if let Some(slot) = slot { + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + } + + if let Some(index) = committee_index { + path.query_pairs_mut() + .append_pair("committee_index", &index.to_string()); + } + self.get(path).await } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 51e3c6c339..0bed06f699 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -349,6 +349,13 @@ impl fmt::Display for ValidatorStatus { pub struct CommitteesQuery { pub slot: Option, pub index: Option, + pub epoch: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct AttestationPoolQuery { + pub slot: Option, + pub committee_index: Option, } #[derive(Deserialize)] diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index f4f4eb636e..fbad0d18ee 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -329,6 +329,8 @@ impl AttestationService { .map_err(|e| format!("Failed to produce attestation data: {:?}", e))? .data; + let mut attestations = Vec::with_capacity(validator_duties.len()); + for duty in validator_duties { // Ensure that all required fields are present in the validator duty. let ( @@ -370,39 +372,52 @@ impl AttestationService { signature: AggregateSignature::infinity(), }; - self.validator_store + if self + .validator_store .sign_attestation( duty.validator_pubkey(), validator_committee_position, &mut attestation, current_epoch, ) - .ok_or_else(|| "Failed to sign attestation".to_string())?; - - match self - .beacon_node - .post_beacon_pool_attestations(&attestation) - .await + .is_some() { - Ok(()) => info!( + attestations.push(attestation); + } else { + crit!( log, - "Successfully published attestation"; - "head_block" => format!("{:?}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "unaggregated", - ), - Err(e) => error!( - log, - "Unable to publish attestation"; - "error" => e.to_string(), - "committee_index" => attestation.data.index, + "Failed to sign attestation"; + "committee_index" => committee_index, "slot" => slot.as_u64(), - "type" => "unaggregated", - ), + ); + continue; } } + match self + .beacon_node + .post_beacon_pool_attestations(attestations.as_slice()) + .await + { + Ok(()) => info!( + log, + "Successfully published attestations"; + "count" => attestations.len(), + "head_block" => ?attestation_data.beacon_block_root, + "committee_index" => attestation_data.index, + "slot" => attestation_data.slot.as_u64(), + "type" => "unaggregated", + ), + Err(e) => error!( + log, + "Unable to publish attestations"; + "error" => ?e, + "committee_index" => attestation_data.index, + "slot" => slot.as_u64(), + "type" => "unaggregated", + ), + } + Ok(Some(attestation_data)) }