Merge branch 'validator-enhancements' into paul-gossip-test

This commit is contained in:
Paul Hauner
2019-03-31 17:47:35 +11:00
35 changed files with 1143 additions and 501 deletions

View File

@@ -290,7 +290,7 @@ where
/// fork-choice rule).
///
/// It is important to note that the `beacon_state` returned may not match the present slot. It
/// is the state as it was when the head block was recieved, which could be some slots prior to
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> RwLockReadGuard<CheckPoint> {
self.canonical_head.read()

View File

@@ -6,7 +6,7 @@ pub mod test_utils;
pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock};
pub use self::checkpoint::CheckPoint;
pub use self::errors::BeaconChainError;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use db;
pub use fork_choice;
pub use parking_lot;

View File

@@ -50,7 +50,7 @@ impl<T: ClientDB, U: SlotClock, F: ForkChoice> DirectBeaconNode<T, U, F> {
}
impl<T: ClientDB, U: SlotClock, F: ForkChoice> AttesterBeaconNode for DirectBeaconNode<T, U, F> {
fn produce_attestation(
fn produce_attestation_data(
&self,
_slot: Slot,
shard: u64,

View File

@@ -22,13 +22,13 @@ pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exi
// build heartbeat logic here
let heartbeat = move |_| {
debug!(log, "Temp heartbeat output");
//debug!(log, "Temp heartbeat output");
//TODO: Remove this logic. Testing only
let mut count = counter.lock().unwrap();
*count += 1;
if *count % 5 == 0 {
debug!(log, "Sending Message");
// debug!(log, "Sending Message");
network.send_message();
}

View File

@@ -161,7 +161,7 @@ fn network_service(
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
// debug!(log, "Received message from notifier");
}
};
}

View File

@@ -0,0 +1,159 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{
AttestationData as AttestationDataProto, ProduceAttestationDataRequest,
ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse,
};
use protos::services_grpc::AttestationService;
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::Attestation;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: slog::Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce the `AttestationData` for signing by a validator.
fn produce_attestation_data(
&mut self,
ctx: RpcContext,
req: ProduceAttestationDataRequest,
sink: UnarySink<ProduceAttestationDataResponse>,
) {
trace!(
&self.log,
"Attempting to produce attestation at slot {}",
req.get_slot()
);
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
let state = self.chain.get_state();
// Start by performing some checks
// Check that the AttestionData is for the current slot (otherwise it will not be valid)
if slot_requested > state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
Some(format!(
"AttestationData request for a slot that is in the future."
)),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
// currently cannot handle past slots. TODO: Handle this case
else if slot_requested < state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some(format!(
"AttestationData request for a slot that is in the past."
)),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
}
// Then get the AttestationData from the beacon chain
let shard = req.get_shard();
let attestation_data = match self.chain.produce_attestation_data(shard) {
Ok(v) => v,
Err(e) => {
// Could not produce an attestation
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown,
Some(format!("Could not produce an attestation: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let mut attestation_data_proto = AttestationDataProto::new();
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
let mut resp = ProduceAttestationDataResponse::new();
resp.set_attestation_data(attestation_data_proto);
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `FreeAttestation` from the validator,
/// store it, and aggregate it into an `Attestation`.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
trace!(self.log, "Publishing attestation");
let mut resp = PublishAttestationResponse::new();
let ssz_serialized_attestation = req.get_attestation().get_ssz();
let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) {
Ok((v, _index)) => v,
Err(_) => {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid attestation".to_string()),
))
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
return ctx.spawn(f);
}
};
match self.chain.process_attestation(attestation) {
Ok(_) => {
// Attestation was successfully processed.
info!(
self.log,
"PublishAttestation";
"type" => "valid_attestation",
);
resp.set_success(true);
}
Err(e) => {
// Attestation was invalid
warn!(
self.log,
"PublishAttestation";
"type" => "invalid_attestation",
"error" => format!("{:?}", e),
);
resp.set_success(false);
resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec());
}
};
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@@ -1,61 +0,0 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse,
ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest,
PublishAttestation
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub log: Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_attestation(
&mut self,
ctx: RpcContext,
req: ProduceAttestationRequest,
sink: UnarySink<ProduceAttestationResponse>,
) {
println!("producing attestation at slot {}", req.get_slot());
// TODO: build a legit block.
let mut attestation = AttestationProto::new();
attestation.set_slot(req.get_slot());
// TODO Set the shard to something legit.
attestation.set_shard(0);
attestation.set_block_root(b"cats".to_vec());
let mut resp = ProduceAttestationResponse::new();
resp.set_attestation_data(attestation);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
println!("publishing attestation {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishAttestationResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@@ -2,7 +2,7 @@ use crate::beacon_chain::BeaconChain;
use crossbeam_channel;
use eth2_libp2p::PubsubMessage;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage;
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
@@ -10,10 +10,10 @@ use protos::services::{
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use slog::{error, info, warn};
use ssz::Decodable;
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::BeaconBlock;
use types::{BeaconBlock, Signature, Slot};
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
@@ -30,11 +30,44 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: ProduceBeaconBlockRequest,
sink: UnarySink<ProduceBeaconBlockResponse>,
) {
println!("producing at slot {}", req.get_slot());
trace!(self.log, "Generating a beacon block"; "req" => format!("{:?}", req));
// decode the request
// TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336
let _requested_slot = Slot::from(req.get_slot());
let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) {
Ok((reveal, _index)) => reveal,
Err(_) => {
// decode error, incorrect signature
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some(format!("Invalid randao reveal signature")),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let produced_block = match self.chain.produce_block(randao_reveal) {
Ok((block, _state)) => block,
Err(e) => {
// could not produce a block
let log_clone = self.log.clone();
warn!(self.log, "RPC Error"; "Error" => format!("Could not produce a block:{:?}",e));
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown,
Some(format!("Could not produce a block: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
// TODO: build a legit block.
let mut block = BeaconBlockProto::new();
block.set_ssz(b"cats".to_vec());
block.set_ssz(ssz_encode(&produced_block));
let mut resp = ProduceBeaconBlockResponse::new();
resp.set_block(block);
@@ -52,6 +85,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
trace!(&self.log, "Attempting to publish a block");
let mut resp = PublishBeaconBlockResponse::new();
let ssz_serialized_block = req.get_block().get_ssz();

View File

@@ -2,12 +2,13 @@ use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice,
parking_lot::RwLockReadGuard,
parking_lot::{RwLockReadGuard, RwLockWriteGuard},
slot_clock::SlotClock,
types::{BeaconState, ChainSpec},
types::{BeaconState, ChainSpec, Signature},
AttestationValidationError, BlockProductionError,
};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
use types::BeaconBlock;
use types::{Attestation, AttestationData, BeaconBlock};
/// The RPC's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
@@ -15,8 +16,22 @@ pub trait BeaconChain: Send + Sync {
fn get_state(&self) -> RwLockReadGuard<BeaconState>;
fn get_mut_state(&self) -> RwLockWriteGuard<BeaconState>;
fn process_block(&self, block: BeaconBlock)
-> Result<BlockProcessingOutcome, BeaconChainError>;
fn produce_block(
&self,
randao_reveal: Signature,
) -> Result<(BeaconBlock, BeaconState), BlockProductionError>;
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError>;
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
@@ -33,10 +48,32 @@ where
self.state.read()
}
fn get_mut_state(&self) -> RwLockWriteGuard<BeaconState> {
self.state.write()
}
fn process_block(
&self,
block: BeaconBlock,
) -> Result<BlockProcessingOutcome, BeaconChainError> {
self.process_block(block)
}
fn produce_block(
&self,
randao_reveal: Signature,
) -> Result<(BeaconBlock, BeaconState), BlockProductionError> {
self.produce_block(randao_reveal)
}
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError> {
self.produce_attestation_data(shard)
}
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError> {
self.process_attestation(attestation)
}
}

View File

@@ -1,19 +1,22 @@
mod attestation;
mod beacon_block;
pub mod beacon_chain;
mod beacon_node;
pub mod config;
mod validator;
use self::attestation::AttestationServiceInstance;
use self::beacon_block::BeaconBlockServiceInstance;
use self::beacon_chain::BeaconChain;
use self::beacon_node::BeaconNodeServiceInstance;
use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder};
use futures::Future;
use grpcio::{Environment, ServerBuilder};
use network::NetworkMessage;
use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service,
create_attestation_service, create_beacon_block_service, create_beacon_node_service,
create_validator_service,
};
use slog::{info, o, warn};
use std::sync::Arc;
@@ -56,11 +59,19 @@ pub fn start_server(
};
create_validator_service(instance)
};
let attestation_service = {
let instance = AttestationServiceInstance {
chain: beacon_chain.clone(),
log: log.clone(),
};
create_attestation_service(instance)
};
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.register_service(beacon_node_service)
.register_service(attestation_service)
.bind(config.listen_address.to_string(), config.port)
.build()
.unwrap();

View File

@@ -4,7 +4,7 @@ use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty};
use protos::services_grpc::ValidatorService;
use slog::{debug, info, warn, Logger};
use slog::{trace, warn};
use ssz::Decodable;
use std::sync::Arc;
use types::{Epoch, RelativeEpoch};
@@ -12,7 +12,7 @@ use types::{Epoch, RelativeEpoch};
#[derive(Clone)]
pub struct ValidatorServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: Logger,
pub log: slog::Logger,
}
//TODO: Refactor Errors
@@ -27,14 +27,13 @@ impl ValidatorService for ValidatorServiceInstance {
sink: UnarySink<GetDutiesResponse>,
) {
let validators = req.get_validators();
debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators();
trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let spec = self.chain.get_spec();
let state = self.chain.get_state();
let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators();
let relative_epoch =
match RelativeEpoch::from_epoch(state.slot.epoch(spec.slots_per_epoch), epoch) {
@@ -157,6 +156,7 @@ impl ValidatorService for ValidatorServiceInstance {
duty.set_committee_index(attestation_duties.committee_index as u64);
duty.set_attestation_slot(attestation_duties.slot.as_u64());
duty.set_attestation_shard(attestation_duties.shard);
duty.set_committee_len(attestation_duties.committee_len as u64);
active_validator.set_duty(duty);
resp_validators.push(active_validator);