Merge branch 'validator-enhancements' into testnet-client

This commit is contained in:
Paul Hauner
2019-04-02 14:29:43 +11:00
54 changed files with 1620 additions and 852 deletions

View File

@@ -0,0 +1,159 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{
AttestationData as AttestationDataProto, ProduceAttestationDataRequest,
ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse,
};
use protos::services_grpc::AttestationService;
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::Attestation;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: slog::Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce the `AttestationData` for signing by a validator.
fn produce_attestation_data(
&mut self,
ctx: RpcContext,
req: ProduceAttestationDataRequest,
sink: UnarySink<ProduceAttestationDataResponse>,
) {
trace!(
&self.log,
"Attempting to produce attestation at slot {}",
req.get_slot()
);
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
let state = self.chain.get_state();
// Start by performing some checks
// Check that the AttestionData is for the current slot (otherwise it will not be valid)
if slot_requested > state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
Some(format!(
"AttestationData request for a slot that is in the future."
)),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
// currently cannot handle past slots. TODO: Handle this case
else if slot_requested < state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some(format!(
"AttestationData request for a slot that is in the past."
)),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
}
// Then get the AttestationData from the beacon chain
let shard = req.get_shard();
let attestation_data = match self.chain.produce_attestation_data(shard) {
Ok(v) => v,
Err(e) => {
// Could not produce an attestation
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown,
Some(format!("Could not produce an attestation: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let mut attestation_data_proto = AttestationDataProto::new();
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
let mut resp = ProduceAttestationDataResponse::new();
resp.set_attestation_data(attestation_data_proto);
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `FreeAttestation` from the validator,
/// store it, and aggregate it into an `Attestation`.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
trace!(self.log, "Publishing attestation");
let mut resp = PublishAttestationResponse::new();
let ssz_serialized_attestation = req.get_attestation().get_ssz();
let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) {
Ok((v, _index)) => v,
Err(_) => {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid attestation".to_string()),
))
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
return ctx.spawn(f);
}
};
match self.chain.process_attestation(attestation) {
Ok(_) => {
// Attestation was successfully processed.
info!(
self.log,
"PublishAttestation";
"type" => "valid_attestation",
);
resp.set_success(true);
}
Err(e) => {
// Attestation was invalid
warn!(
self.log,
"PublishAttestation";
"type" => "invalid_attestation",
"error" => format!("{:?}", e),
);
resp.set_success(false);
resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec());
}
};
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@@ -1,61 +0,0 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse,
ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest,
PublishAttestation
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub log: Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_attestation(
&mut self,
ctx: RpcContext,
req: ProduceAttestationRequest,
sink: UnarySink<ProduceAttestationResponse>,
) {
println!("producing attestation at slot {}", req.get_slot());
// TODO: build a legit block.
let mut attestation = AttestationProto::new();
attestation.set_slot(req.get_slot());
// TODO Set the shard to something legit.
attestation.set_shard(0);
attestation.set_block_root(b"cats".to_vec());
let mut resp = ProduceAttestationResponse::new();
resp.set_attestation_data(attestation);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
println!("publishing attestation {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishAttestationResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@@ -1,9 +1,8 @@
use crate::beacon_chain::BeaconChain;
use crossbeam_channel;
use eth2_libp2p::rpc::methods::BlockRootSlot;
use eth2_libp2p::PubsubMessage;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage;
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
@@ -11,10 +10,10 @@ use protos::services::{
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use slog::{debug, error, info, warn};
use ssz::{Decodable, TreeHash};
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::{BeaconBlock, Hash256, Slot};
use types::{BeaconBlock, Signature, Slot};
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
@@ -31,11 +30,44 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: ProduceBeaconBlockRequest,
sink: UnarySink<ProduceBeaconBlockResponse>,
) {
println!("producing at slot {}", req.get_slot());
trace!(self.log, "Generating a beacon block"; "req" => format!("{:?}", req));
// decode the request
// TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336
let _requested_slot = Slot::from(req.get_slot());
let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) {
Ok((reveal, _index)) => reveal,
Err(_) => {
// decode error, incorrect signature
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some(format!("Invalid randao reveal signature")),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let produced_block = match self.chain.produce_block(randao_reveal) {
Ok((block, _state)) => block,
Err(e) => {
// could not produce a block
let log_clone = self.log.clone();
warn!(self.log, "RPC Error"; "Error" => format!("Could not produce a block:{:?}",e));
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown,
Some(format!("Could not produce a block: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
// TODO: build a legit block.
let mut block = BeaconBlockProto::new();
block.set_ssz(b"cats".to_vec());
block.set_ssz(ssz_encode(&produced_block));
let mut resp = ProduceBeaconBlockResponse::new();
resp.set_block(block);
@@ -53,14 +85,14 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
trace!(&self.log, "Attempting to publish a block");
let mut resp = PublishBeaconBlockResponse::new();
let ssz_serialized_block = req.get_block().get_ssz();
match BeaconBlock::ssz_decode(ssz_serialized_block, 0) {
Ok((block, _i)) => {
let block_root = Hash256::from_slice(&block.hash_tree_root()[..]);
match self.chain.process_block(block.clone()) {
Ok(outcome) => {
if outcome.sucessfully_processed() {
@@ -76,16 +108,22 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
// TODO: Obtain topics from the network service properly.
let topic =
types::TopicBuilder::new("beacon_chain".to_string()).build();
let message = PubsubMessage::Block(BlockRootSlot {
block_root,
slot: block.slot,
});
let message = PubsubMessage::Block(block);
println!("Sending beacon block to gossipsub");
self.network_chan.send(NetworkMessage::Publish {
topics: vec![topic],
message,
});
// Publish the block to the p2p network via gossipsub.
self.network_chan
.send(NetworkMessage::Publish {
topics: vec![topic],
message,
})
.unwrap_or_else(|e| {
error!(
self.log,
"PublishBeaconBlock";
"type" => "failed to publish to gossipsub",
"error" => format!("{:?}", e)
);
});
resp.set_success(true);
} else if outcome.is_invalid() {

View File

@@ -2,12 +2,13 @@ use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice,
parking_lot::RwLockReadGuard,
parking_lot::{RwLockReadGuard, RwLockWriteGuard},
slot_clock::SlotClock,
types::{BeaconState, ChainSpec},
types::{BeaconState, ChainSpec, Signature},
AttestationValidationError, BlockProductionError,
};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
use types::BeaconBlock;
use types::{Attestation, AttestationData, BeaconBlock};
/// The RPC's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
@@ -15,8 +16,22 @@ pub trait BeaconChain: Send + Sync {
fn get_state(&self) -> RwLockReadGuard<BeaconState>;
fn get_mut_state(&self) -> RwLockWriteGuard<BeaconState>;
fn process_block(&self, block: BeaconBlock)
-> Result<BlockProcessingOutcome, BeaconChainError>;
fn produce_block(
&self,
randao_reveal: Signature,
) -> Result<(BeaconBlock, BeaconState), BlockProductionError>;
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError>;
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
@@ -33,10 +48,32 @@ where
self.state.read()
}
fn get_mut_state(&self) -> RwLockWriteGuard<BeaconState> {
self.state.write()
}
fn process_block(
&self,
block: BeaconBlock,
) -> Result<BlockProcessingOutcome, BeaconChainError> {
self.process_block(block)
}
fn produce_block(
&self,
randao_reveal: Signature,
) -> Result<(BeaconBlock, BeaconState), BlockProductionError> {
self.produce_block(randao_reveal)
}
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError> {
self.produce_attestation_data(shard)
}
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError> {
self.process_attestation(attestation)
}
}

View File

@@ -1,19 +1,22 @@
mod attestation;
mod beacon_block;
pub mod beacon_chain;
mod beacon_node;
pub mod config;
mod validator;
use self::attestation::AttestationServiceInstance;
use self::beacon_block::BeaconBlockServiceInstance;
use self::beacon_chain::BeaconChain;
use self::beacon_node::BeaconNodeServiceInstance;
use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder};
use futures::Future;
use grpcio::{Environment, ServerBuilder};
use network::NetworkMessage;
use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service,
create_attestation_service, create_beacon_block_service, create_beacon_node_service,
create_validator_service,
};
use slog::{info, o, warn};
use std::sync::Arc;
@@ -56,11 +59,19 @@ pub fn start_server(
};
create_validator_service(instance)
};
let attestation_service = {
let instance = AttestationServiceInstance {
chain: beacon_chain.clone(),
log: log.clone(),
};
create_attestation_service(instance)
};
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.register_service(beacon_node_service)
.register_service(attestation_service)
.bind(config.listen_address.to_string(), config.port)
.build()
.unwrap();

View File

@@ -4,7 +4,7 @@ use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty};
use protos::services_grpc::ValidatorService;
use slog::{debug, info, warn, Logger};
use slog::{trace, warn};
use ssz::decode;
use std::sync::Arc;
use types::{Epoch, RelativeEpoch};
@@ -12,7 +12,7 @@ use types::{Epoch, RelativeEpoch};
#[derive(Clone)]
pub struct ValidatorServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: Logger,
pub log: slog::Logger,
}
//TODO: Refactor Errors
@@ -27,14 +27,13 @@ impl ValidatorService for ValidatorServiceInstance {
sink: UnarySink<GetDutiesResponse>,
) {
let validators = req.get_validators();
debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators();
trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let spec = self.chain.get_spec();
let state = self.chain.get_state();
let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators();
let relative_epoch =
match RelativeEpoch::from_epoch(state.slot.epoch(spec.slots_per_epoch), epoch) {
@@ -84,7 +83,7 @@ impl ValidatorService for ValidatorServiceInstance {
RpcStatusCode::InvalidArgument,
Some("Invalid public_key".to_string()),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}", req));
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
return ctx.spawn(f);
}
};
@@ -157,6 +156,7 @@ impl ValidatorService for ValidatorServiceInstance {
duty.set_committee_index(attestation_duties.committee_index as u64);
duty.set_attestation_slot(attestation_duties.slot.as_u64());
duty.set_attestation_shard(attestation_duties.shard);
duty.set_committee_len(attestation_duties.committee_len as u64);
active_validator.set_duty(duty);
resp_validators.push(active_validator);