diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6ad0628d3d..501ac9260b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1615,6 +1615,13 @@ impl BeaconChain { metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::SYNC_CONTRIBUTION_GOSSIP_VERIFICATION_TIMES); VerifiedSyncContribution::verify(sync_contribution, self).map(|v| { + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_contribution_subscribers() { + event_handler.register(EventKind::ContributionAndProof(Box::new( + v.aggregate().clone(), + ))); + } + } metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_SUCCESSES); v }) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index b71236bc59..1db7457279 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -12,28 +12,14 @@ pub struct ServerSentEventHandler { finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, - chain_reorg: Sender>, + chain_reorg_tx: Sender>, + contribution_tx: Sender>, log: Logger, } impl ServerSentEventHandler { pub fn new(log: Logger) -> Self { - let (attestation_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - let (block_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); - - Self { - attestation_tx, - block_tx, - finalized_tx, - head_tx, - exit_tx, - chain_reorg, - log, - } + Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY) } pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { @@ -42,7 +28,8 @@ impl ServerSentEventHandler { let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); - let (chain_reorg, _) = broadcast::channel(capacity); + let (chain_reorg_tx, _) = broadcast::channel(capacity); + let (contribution_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -50,7 +37,8 @@ impl ServerSentEventHandler { finalized_tx, head_tx, exit_tx, - chain_reorg, + chain_reorg_tx, + contribution_tx, log, } } @@ -70,8 +58,10 @@ impl ServerSentEventHandler { .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), - EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg)) + EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg)) .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), + EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof)) + .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -99,7 +89,11 @@ impl ServerSentEventHandler { } pub fn subscribe_reorgs(&self) -> Receiver> { - self.chain_reorg.subscribe() + self.chain_reorg_tx.subscribe() + } + + pub fn subscribe_contributions(&self) -> Receiver> { + self.contribution_tx.subscribe() } pub fn has_attestation_subscribers(&self) -> bool { @@ -123,6 +117,10 @@ impl ServerSentEventHandler { } pub fn has_reorg_subscribers(&self) -> bool { - self.chain_reorg.receiver_count() > 0 + self.chain_reorg_tx.receiver_count() > 0 + } + + pub fn has_contribution_subscribers(&self) -> bool { + self.contribution_tx.receiver_count() > 0 } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 67adda5fa1..2eb4489f2a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -351,7 +351,7 @@ where .chain_config(chain_config) .event_handler(Some(ServerSentEventHandler::new_with_capacity( log.clone(), - 1, + 5, ))) .monitor_validators(true, vec![], log); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index be55212239..61a9cf1370 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2472,6 +2472,9 @@ pub fn serve( api_types::EventTopic::ChainReorg => { event_handler.subscribe_reorgs() } + api_types::EventTopic::ContributionAndProof => { + event_handler.subscribe_contributions() + } }; receivers.push(BroadcastStream::new(receiver).map(|msg| { diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 5ccd672ca6..2252e92b36 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1,4 +1,5 @@ use crate::common::{create_api_server, ApiServer}; +use beacon_chain::test_utils::RelativeSyncCommittee; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, @@ -50,6 +51,7 @@ struct ApiTester { next_block: SignedBeaconBlock, reorg_block: SignedBeaconBlock, attestations: Vec>, + contribution_and_proofs: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, voluntary_exit: SignedVoluntaryExit, @@ -65,10 +67,13 @@ impl ApiTester { // This allows for testing voluntary exits without building out a massive chain. let mut spec = E::default_spec(); spec.shard_committee_period = 2; + Self::new_from_spec(spec) + } + pub fn new_from_spec(spec: ChainSpec) -> Self { let harness = BeaconChainHarness::new( MainnetEthSpec, - Some(spec), + Some(spec.clone()), generate_deterministic_keypairs(VALIDATOR_COUNT), ); @@ -122,6 +127,30 @@ impl ApiTester { "precondition: attestations for testing" ); + let current_epoch = harness + .chain + .slot() + .expect("should get current slot") + .epoch(E::slots_per_epoch()); + let is_altair = spec + .altair_fork_epoch + .map(|epoch| epoch <= current_epoch) + .unwrap_or(false); + let contribution_and_proofs = if is_altair { + harness + .make_sync_contributions( + &head.beacon_state, + head_state_root, + harness.chain.slot().unwrap(), + RelativeSyncCommittee::Current, + ) + .into_iter() + .filter_map(|(_, contribution)| contribution) + .collect::>() + } else { + vec![] + }; + let attester_slashing = harness.make_attester_slashing(vec![0, 1]); let proposer_slashing = harness.make_proposer_slashing(2); let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); @@ -172,6 +201,7 @@ impl ApiTester { next_block, reorg_block, attestations, + contribution_and_proofs, attester_slashing, proposer_slashing, voluntary_exit, @@ -250,6 +280,7 @@ impl ApiTester { next_block, reorg_block, attestations, + contribution_and_proofs: vec![], attester_slashing, proposer_slashing, voluntary_exit, @@ -2325,6 +2356,40 @@ impl ApiTester { self } + pub async fn test_get_events_altair(self) -> Self { + let topics = vec![EventTopic::ContributionAndProof]; + let mut events_future = self + .client + .get_events::(topics.as_slice()) + .await + .unwrap(); + + let expected_contribution_len = self.contribution_and_proofs.len(); + + self.client + .post_validator_contribution_and_proofs(self.contribution_and_proofs.as_slice()) + .await + .unwrap(); + + let contribution_events = poll_events( + &mut events_future, + expected_contribution_len, + Duration::from_millis(10000), + ) + .await; + assert_eq!( + contribution_events.as_slice(), + self.contribution_and_proofs + .clone() + .into_iter() + .map(|contribution| EventKind::ContributionAndProof(Box::new(contribution))) + .collect::>() + .as_slice() + ); + + self + } + pub async fn test_get_events_from_genesis(self) -> Self { let topics = vec![EventTopic::Block, EventTopic::Head]; let mut events_future = self @@ -2391,6 +2456,15 @@ async fn get_events() { ApiTester::new().test_get_events().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_events_altair() { + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_spec(spec) + .test_get_events_altair() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events_from_genesis() { ApiTester::new_from_genesis() diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 6f583a3959..eeb379d367 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -770,6 +770,7 @@ pub enum EventKind { Head(SseHead), VoluntaryExit(SignedVoluntaryExit), ChainReorg(SseChainReorg), + ContributionAndProof(Box>), } impl EventKind { @@ -781,6 +782,7 @@ impl EventKind { EventKind::VoluntaryExit(_) => "voluntary_exit", EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", + EventKind::ContributionAndProof(_) => "contribution_and_proof", } } @@ -825,6 +827,11 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Voluntary Exit: {:?}", e)) })?, )), + "contribution_and_proof" => Ok(EventKind::ContributionAndProof(Box::new( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Contribution and Proof: {:?}", e)) + })?, + ))), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -846,6 +853,7 @@ pub enum EventTopic { VoluntaryExit, FinalizedCheckpoint, ChainReorg, + ContributionAndProof, } impl FromStr for EventTopic { @@ -859,6 +867,7 @@ impl FromStr for EventTopic { "voluntary_exit" => Ok(EventTopic::VoluntaryExit), "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), + "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -873,6 +882,7 @@ impl fmt::Display for EventTopic { EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), + EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), } } }