From fc5142c09abbe11597f1ac302c13125a554e42a6 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 30 Mar 2019 19:32:32 +1100 Subject: [PATCH] Registers the attester service to the beacon node RPC client --- beacon_node/beacon_chain/src/lib.rs | 4 +- beacon_node/rpc/src/attestation.rs | 143 ++++++++++++++++ beacon_node/rpc/src/beacon_attester.rs | 131 --------------- beacon_node/rpc/src/beacon_block.rs | 6 +- beacon_node/rpc/src/beacon_chain.rs | 22 ++- beacon_node/rpc/src/lib.rs | 17 +- .../testing_beacon_state_builder.rs | 2 +- .../beacon_node_attestation.rs | 23 +++ .../src/attestation_producer/grpc.rs | 59 +++++++ .../src/attestation_producer/mod.rs | 155 ++++++++++++++++++ validator_client/src/service.rs | 3 + 11 files changed, 425 insertions(+), 140 deletions(-) create mode 100644 beacon_node/rpc/src/attestation.rs delete mode 100644 beacon_node/rpc/src/beacon_attester.rs create mode 100644 validator_client/src/attestation_producer/beacon_node_attestation.rs create mode 100644 validator_client/src/attestation_producer/grpc.rs create mode 100644 validator_client/src/attestation_producer/mod.rs diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 234a960945..69ff14671b 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,7 +5,9 @@ mod errors; pub mod initialise; pub mod test_utils; -pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; +pub use self::beacon_chain::{ + AttestationValidationError, BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock, +}; pub use self::checkpoint::CheckPoint; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use attestation_aggregator::Outcome as AggregationOutcome; diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs new file mode 100644 index 0000000000..f9423df615 --- /dev/null +++ b/beacon_node/rpc/src/attestation.rs @@ -0,0 +1,143 @@ +use crate::beacon_chain::BeaconChain; +use futures::Future; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use protos::services::{ + AttestationData as AttestationDataProto, ProduceAttestationDataRequest, + ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse, +}; +use protos::services_grpc::AttestationService; +use slog::{error, info, trace, warn, Logger}; +use ssz::{ssz_encode, Decodable}; +use std::sync::Arc; +use types::Attestation; + +#[derive(Clone)] +pub struct AttestationServiceInstance { + pub chain: Arc, + pub log: slog::Logger, +} + +impl AttestationService for AttestationServiceInstance { + /// Produce the `AttestationData` for signing by a validator. + fn produce_attestation_data( + &mut self, + ctx: RpcContext, + req: ProduceAttestationDataRequest, + sink: UnarySink, + ) { + warn!( + &self.log, + "Attempting to produce attestation at slot {}", + req.get_slot() + ); + + // verify the slot, drop lock on state afterwards + { + let slot_requested = req.get_slot(); + let state = self.chain.get_state(); + + // Start by performing some checks + // Check that the the AttestionData is for the current slot (otherwise it will not be valid) + if slot_requested != state.slot.as_u64() { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::OutOfRange, + Some(format!( + "AttestationData request for a slot that is not the current slot." + )), + )) + .map_err(move |e| { + error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e) + }); + return ctx.spawn(f); + } + } + + // Then get the AttestationData from the beacon chain + let shard = req.get_shard(); + let attestation_data = match self.chain.produce_attestation_data(shard) { + Ok(v) => v, + Err(e) => { + // Could not produce an attestation + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown, + Some(format!("Could not produce an attestation: {:?}", e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + let mut attestation_data_proto = AttestationDataProto::new(); + attestation_data_proto.set_ssz(ssz_encode(&attestation_data)); + + let mut resp = ProduceAttestationDataResponse::new(); + resp.set_attestation_data(attestation_data_proto); + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + /// Accept some fully-formed `FreeAttestation` from the validator, + /// store it, and aggregate it into an `Attestation`. + fn publish_attestation( + &mut self, + ctx: RpcContext, + req: PublishAttestationRequest, + sink: UnarySink, + ) { + warn!(self.log, "Publishing attestation"); + + let mut resp = PublishAttestationResponse::new(); + let ssz_serialized_attestation = req.get_attestation().get_ssz(); + + let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { + Ok((v, _index)) => v, + Err(_) => { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some("Invalid attestation".to_string()), + )) + .map_err(move |_| warn!(log_clone, "failed to reply {:?}", req)); + return ctx.spawn(f); + } + }; + + match self.chain.process_attestation(attestation) { + Ok(_) => { + // Attestation was successfully processed. + info!( + self.log, + "PublishAttestation"; + "type" => "valid_attestation", + ); + + resp.set_success(true); + } + Err(e) => { + // Attestation was invalid + warn!( + self.log, + "PublishAttestation"; + "type" => "invalid_attestation", + ); + resp.set_success(false); + resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec()); + } + }; + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/beacon_node/rpc/src/beacon_attester.rs b/beacon_node/rpc/src/beacon_attester.rs deleted file mode 100644 index d787d1e5f0..0000000000 --- a/beacon_node/rpc/src/beacon_attester.rs +++ /dev/null @@ -1,131 +0,0 @@ -use crate::beacon_chain::BeaconChain; -use futures::Future; -use grpcio::{RpcContext, UnarySink, RpcStatus, RpcStatusCode}; -use protos::services::{ - AttestationData as AttestationDataProto, ProduceAttestationData, ProduceAttestationDataResponse, - ProduceAttestationDataRequest, PublishAttestationResponse, PublishAttestationRequest, - PublishAttestation -}; -use protos::services_grpc::BeaconBlockService; -use slog::{Logger, info, warn, error, trace}; - -#[derive(Clone)] -pub struct AttestationServiceInstance { - pub chain: Arc, - pub log: Logger, -} - -impl AttestationService for AttestationServiceInstance { - /// Produce the `AttestationData` for signing by a validator. - fn produce_attestation_data( - &mut self, - ctx: RpcContext, - req: ProduceAttestationDataRequest, - sink: UnarySink, - ) { - trace!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot()); - - // verify the slot, drop lock on state afterwards - { - let slot_requested = req.get_slot(); - let state = self.chain.get_state(); - - // Start by performing some checks - // Check that the the AttestionData is for the current slot (otherwise it will not be valid) - if slot_requested != state.slot { - let f = sink - .fail(RpcStatus::new( - RpcStatusCode::OutOfRange, - "AttestationData request for a slot that is not the current slot." - )) - .map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e)); - } - } - - // Then get the AttestationData from the beacon chain - let attestation_data = match self.chain.produce_attestation_data(req.get_shard()){ - Ok(v) => v, - Err(e) => { - // Could not produce an attestation - let log_clone = self.log.clone(); - let f = sink - .fail(RpcStatus::new( - RpcStatusCode::Unknown - Some(format!("Could not produce an attestation: {:?}",e)), - )) - .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); - return ctx.spawn(f); - } - }; - - - let mut attestation_data_proto = AttestationDataProto::new(); - attestation_data_proto.set_ssz(ssz_encode(&attestation_data)); - - let mut resp = ProduceAttestationDataResponse::new(); - resp.set_attestation_data(attestation_data_proto); - - let f = sink - .success(resp) - .map_err(move |e| error!("Failed to reply with success {:?}: {:?}", req, e)); - ctx.spawn(f) - } - - /// Accept some fully-formed `FreeAttestation` from the validator, - /// store it, and aggregate it into an `Attestation`. - fn publish_attestation( - &mut self, - ctx: RpcContext, - req: PublishAttestationRequest, - sink: UnarySink, - ) { - trace!(self.log, "Publishing attestation"); - - let mut resp = PublishAttestationResponse::new(); - let ssz_serialized_attestation = req.get_attestation().get_ssz(); - - let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { - Ok((v, _index)) => v, - Err(_) => { - let log_clone = self.log.clone(); - let f = sink - .fail(RpcStatus::new( - RpcStatusCode::InvalidArgument, - Some("Invalid attestation".to_string()), - )) - .map_err(move |e| warn!(log_clone, "failed to reply {:?}", req)); - return ctx.spawn(f); - } - }; - - match self.chain.process_attestation(attestation) { - Ok(_) => { - // Attestation was successfully processed. - info!( - self.log, - "PublishAttestation"; - "type" => "valid_attestation", - ); - - resp.set_success(true); - }, - Err(e)=> { - // Attestation was invalid - warn!( - self.log, - "PublishAttestation"; - "type" => "invalid_attestation", - ); - resp.set_success(false); - resp.set_msg( - format!("InvalidAttestation: {:?}", e).as_bytes().to_vec(), - ); - } - }; - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } -} diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index e8b3cb01b4..adb0a3c22f 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -36,8 +36,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { // decode the request // TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336 let _requested_slot = Slot::from(req.get_slot()); - let (randao_reveal, _index) = match Signature::ssz_decode(req.get_randao_reveal(), 0) { - Ok(v) => v, + let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) { + Ok((reveal, _index)) => reveal, Err(_) => { // decode error, incorrect signature let log_clone = self.log.clone(); @@ -86,6 +86,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: PublishBeaconBlockRequest, sink: UnarySink, ) { + trace!(&self.log, "Attempting to publish a block"); + let mut resp = PublishBeaconBlockResponse::new(); let ssz_serialized_block = req.get_block().get_ssz(); diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index f21b8df7b4..7de48efa1c 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -5,10 +5,10 @@ use beacon_chain::{ parking_lot::RwLockReadGuard, slot_clock::SlotClock, types::{BeaconState, ChainSpec, Signature}, - BlockProductionError, + AttestationValidationError, BlockProductionError, }; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; -use types::BeaconBlock; +use types::{Attestation, AttestationData, BeaconBlock}; /// The RPC's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -23,6 +23,13 @@ pub trait BeaconChain: Send + Sync { &self, randao_reveal: Signature, ) -> Result<(BeaconBlock, BeaconState), BlockProductionError>; + + fn produce_attestation_data(&self, shard: u64) -> Result; + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError>; } impl BeaconChain for RawBeaconChain @@ -52,4 +59,15 @@ where ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { self.produce_block(randao_reveal) } + + fn produce_attestation_data(&self, shard: u64) -> Result { + self.produce_attestation_data(shard) + } + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError> { + self.process_attestation(attestation) + } } diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 2d47b4a69a..5aac4ce558 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -1,19 +1,22 @@ +mod attestation; mod beacon_block; pub mod beacon_chain; mod beacon_node; pub mod config; mod validator; +use self::attestation::AttestationServiceInstance; use self::beacon_block::BeaconBlockServiceInstance; use self::beacon_chain::BeaconChain; use self::beacon_node::BeaconNodeServiceInstance; use self::validator::ValidatorServiceInstance; pub use config::Config as RPCConfig; -use futures::{future, Future}; -use grpcio::{Environment, Server, ServerBuilder}; +use futures::Future; +use grpcio::{Environment, ServerBuilder}; use network::NetworkMessage; use protos::services_grpc::{ - create_beacon_block_service, create_beacon_node_service, create_validator_service, + create_attestation_service, create_beacon_block_service, create_beacon_node_service, + create_validator_service, }; use slog::{info, o, warn}; use std::sync::Arc; @@ -56,11 +59,19 @@ pub fn start_server( }; create_validator_service(instance) }; + let attestation_service = { + let instance = AttestationServiceInstance { + chain: beacon_chain.clone(), + log: log.clone(), + }; + create_attestation_service(instance) + }; let mut server = ServerBuilder::new(env) .register_service(beacon_block_service) .register_service(validator_service) .register_service(beacon_node_service) + .register_service(attestation_service) .bind(config.listen_address.to_string(), config.port) .build() .unwrap(); diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index 7c231b20b0..8d0aa6af62 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1553918534; // arbitrary + let genesis_time = 1553932445; // arbitrary let mut state = BeaconState::genesis( genesis_time, diff --git a/validator_client/src/attestation_producer/beacon_node_attestation.rs b/validator_client/src/attestation_producer/beacon_node_attestation.rs new file mode 100644 index 0000000000..b5ff777de8 --- /dev/null +++ b/validator_client/src/attestation_producer/beacon_node_attestation.rs @@ -0,0 +1,23 @@ +//TODO: generalise these enums to the crate +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use types::{Attestation, AttestationData, Slot}; + +/// Defines the methods required to produce and publish attestations on a Beacon Node. Abstracts the +/// actual beacon node. +pub trait BeaconNodeAttestation: Send + Sync { + /// Request that the node produces the required attestation data. + /// + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result; + + /// Request that the node publishes a attestation. + /// + /// Returns `true` if the publish was successful. + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result; +} diff --git a/validator_client/src/attestation_producer/grpc.rs b/validator_client/src/attestation_producer/grpc.rs new file mode 100644 index 0000000000..49c577e242 --- /dev/null +++ b/validator_client/src/attestation_producer/grpc.rs @@ -0,0 +1,59 @@ +use super::beacon_node_attestation::BeaconNodeAttestation; +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use protos::services_grpc::AttestationServiceClient; +use ssz::{ssz_encode, Decodable}; + +use protos::services::{ + Attestation as GrpcAttestation, ProduceAttestationDataRequest, PublishAttestationRequest, +}; +use types::{Attestation, AttestationData, Slot}; + +impl BeaconNodeAttestation for AttestationServiceClient { + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result { + let mut req = ProduceAttestationDataRequest::new(); + req.set_slot(slot.as_u64()); + req.set_shard(shard); + + let reply = self + .produce_attestation_data(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + dbg!("Produced Attestation Data"); + + let (attestation_data, _index) = + AttestationData::ssz_decode(reply.get_attestation_data().get_ssz(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + Ok(attestation_data) + } + + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result { + let mut req = PublishAttestationRequest::new(); + + let ssz = ssz_encode(&attestation); + + let mut grpc_attestation = GrpcAttestation::new(); + grpc_attestation.set_ssz(ssz); + + req.set_attestation(grpc_attestation); + + let reply = self + .publish_attestation(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.get_success() { + Ok(PublishOutcome::Valid) + } else { + // TODO: distinguish between different errors + Ok(PublishOutcome::InvalidAttestation( + "Publish failed".to_string(), + )) + } + } +} diff --git a/validator_client/src/attestation_producer/mod.rs b/validator_client/src/attestation_producer/mod.rs new file mode 100644 index 0000000000..7b2174b0cd --- /dev/null +++ b/validator_client/src/attestation_producer/mod.rs @@ -0,0 +1,155 @@ +mod beacon_node_attestation; +mod grpc; + +use std::sync::Arc; +use types::{BeaconBlock, ChainSpec, Domain, Fork, Slot}; +//TODO: Move these higher up in the crate +use super::block_producer::{BeaconNodeError, ValidatorEvent}; +use crate::signer::Signer; +use beacon_node_attestation::BeaconNodeAttestation; +use slog::{error, info, warn}; +use ssz::TreeHash; +use types::{ + AggregateSignature, Attestation, AttestationData, AttestationDataAndCustodyBit, + AttestationDuty, Bitfield, +}; + +//TODO: Group these errors at a crate level +#[derive(Debug, PartialEq)] +pub enum Error { + BeaconNodeError(BeaconNodeError), +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +/// This struct contains the logic for requesting and signing beacon attestations for a validator. The +/// validator can abstractly sign via the Signer trait object. +pub struct AttestationProducer<'a, B: BeaconNodeAttestation, S: Signer> { + /// The current fork. + pub fork: Fork, + /// The attestation duty to perform. + pub duty: AttestationDuty, + /// The current epoch. + pub spec: Arc, + /// The beacon node to connect to. + pub beacon_node: Arc, + /// The signer to sign the block. + pub signer: &'a S, +} + +impl<'a, B: BeaconNodeAttestation, S: Signer> AttestationProducer<'a, B, S> { + /// Handle outputs and results from attestation production. + pub fn handle_produce_attestation(&mut self, log: slog::Logger) { + match self.produce_attestation() { + Ok(ValidatorEvent::AttestationProduced(_slot)) => { + info!(log, "Attestation produced"; "Validator" => format!("{}", self.signer)) + } + Err(e) => error!(log, "Attestation production error"; "Error" => format!("{:?}", e)), + Ok(ValidatorEvent::SignerRejection(_slot)) => { + error!(log, "Attestation production error"; "Error" => format!("Signer could not sign the attestation")) + } + Ok(ValidatorEvent::SlashableAttestationNotProduced(_slot)) => { + error!(log, "Attestation production error"; "Error" => format!("Rejected the attestation as it could have been slashed")) + } + Ok(ValidatorEvent::BeaconNodeUnableToProduceAttestation(_slot)) => { + error!(log, "Attestation production error"; "Error" => format!("Beacon node was unable to produce an attestation")) + } + Ok(v) => { + warn!(log, "Unknown result for attestation production"; "Error" => format!("{:?}",v)) + } + } + } + + /// Produce an attestation, sign it and send it back + /// + /// Assumes that an attestation is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + pub fn produce_attestation(&mut self) -> Result { + let epoch = self.duty.slot.epoch(self.spec.slots_per_epoch); + + let attestation = self + .beacon_node + .produce_attestation_data(self.duty.slot, self.duty.shard)?; + if self.safe_to_produce(&attestation) { + let domain = self.spec.get_domain(epoch, Domain::Attestation, &self.fork); + if let Some(attestation) = self.sign_attestation(attestation, self.duty, domain) { + self.beacon_node.publish_attestation(attestation)?; + Ok(ValidatorEvent::AttestationProduced(self.duty.slot)) + } else { + Ok(ValidatorEvent::SignerRejection(self.duty.slot)) + } + } else { + Ok(ValidatorEvent::SlashableAttestationNotProduced( + self.duty.slot, + )) + } + } + + /// Consumes an attestation, returning the attestation signed by the validators private key. + /// + /// Important: this function will not check to ensure the attestation is not slashable. This must be + /// done upstream. + fn sign_attestation( + &mut self, + mut attestation: AttestationData, + duties: AttestationDuty, + domain: u64, + ) -> Option { + self.store_produce(&attestation); + + // build the aggregate signature + let aggregate_signature = { + let message = AttestationDataAndCustodyBit { + data: attestation.clone(), + custody_bit: false, + } + .hash_tree_root(); + + let sig = self.signer.sign_message(&message, domain)?; + + let mut agg_sig = AggregateSignature::new(); + agg_sig.add(&sig); + agg_sig + }; + + let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len); + let custody_bitfield = Bitfield::with_capacity(duties.committee_len); + aggregation_bitfield.set(duties.committee_index, true); + + Some(Attestation { + aggregation_bitfield, + data: attestation, + custody_bitfield, + aggregate_signature, + }) + } + + /// Returns `true` if signing an attestation is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _attestation: &AttestationData) -> bool { + //TODO: Implement slash protection + true + } + + /// Record that an attestation was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _attestation: &AttestationData) { + // TODO: Implement slash protection + } +} diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index ba0795ec10..91c91e0b10 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -290,6 +290,7 @@ impl Service { // the return value is a future which returns ready. // built to be compatible with the tokio runtime. let _empty = cloned_manager.run_update(current_epoch.clone(), cloned_log.clone()); + dbg!("Duties Thread Ended"); }); } @@ -317,6 +318,7 @@ impl Service { signer, }; block_producer.handle_produce_block(log); + dbg!("Block produce Thread Ended"); }); } if work_type.attestation_duty.is_some() { @@ -338,6 +340,7 @@ impl Service { signer, }; attestation_producer.handle_produce_attestation(log); + dbg!("Attestation Thread Ended"); }); } }