diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 27fcad7cda..0f585b7e75 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,6 +1,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::PubsubMessage; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use network::NetworkMessage; use protos::services::{ AttestationData as AttestationDataProto, ProduceAttestationDataRequest, ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse, @@ -14,6 +16,7 @@ use types::Attestation; #[derive(Clone)] pub struct AttestationServiceInstance { pub chain: Arc>, + pub network_chan: crossbeam_channel::Sender, pub log: slog::Logger, } @@ -124,7 +127,7 @@ impl AttestationService for AttestationServiceInstance { } }; - match self.chain.process_attestation(attestation) { + match self.chain.process_attestation(attestation.clone()) { Ok(_) => { // Attestation was successfully processed. info!( @@ -133,6 +136,25 @@ impl AttestationService for AttestationServiceInstance { "type" => "valid_attestation", ); + // TODO: Obtain topics from the network service properly. + let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); + let message = PubsubMessage::Attestation(attestation); + + // Publish the attestation to the p2p network via gossipsub. + self.network_chan + .send(NetworkMessage::Publish { + topics: vec![topic], + message: Box::new(message), + }) + .unwrap_or_else(|e| { + error!( + self.log, + "PublishAttestation"; + "type" => "failed to publish to gossipsub", + "error" => format!("{:?}", e) + ); + }); + resp.set_success(true); } Err(e) => { diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 95c2e29163..f2f1b2abf3 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -46,7 +46,7 @@ pub fn start_server( let beacon_block_service = { let instance = BeaconBlockServiceInstance { chain: beacon_chain.clone(), - network_chan, + network_chan: network_chan.clone(), log: log.clone(), }; create_beacon_block_service(instance) @@ -61,6 +61,7 @@ pub fn start_server( let attestation_service = { let instance = AttestationServiceInstance { chain: beacon_chain.clone(), + network_chan, log: log.clone(), }; create_attestation_service(instance)