diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index a279b3e4e1..ddef39bc52 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -49,7 +49,7 @@ impl NetworkBehaviourEventProcess { trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); - let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) { + let pubsub_message = match PubsubMessage::from_ssz_bytes(&gs_msg.data) { //TODO: Punish peer on error Err(e) => { warn!( @@ -59,7 +59,7 @@ impl NetworkBehaviourEventProcess msg, + Ok(msg) => msg, }; self.events.push(BehaviourEvent::GossipMessage { @@ -198,33 +198,58 @@ pub enum PubsubMessage { //TODO: Correctly encode/decode enums. Prefixing with integer for now. impl Encodable for PubsubMessage { - fn ssz_append(&self, s: &mut SszStream) { + fn is_ssz_fixed_len() -> bool { + false + } + + fn ssz_append(&self, buf: &mut Vec) { + let offset = ::ssz_fixed_len() + as Encodable>::ssz_fixed_len(); + + let mut encoder = ssz::SszEncoder::container(buf, offset); + match self { PubsubMessage::Block(block_gossip) => { - 0u32.ssz_append(s); - block_gossip.ssz_append(s); + encoder.append(&0_u32); + + // Encode the gossip as a Vec; + encoder.append(&block_gossip.as_ssz_bytes()); } PubsubMessage::Attestation(attestation_gossip) => { - 1u32.ssz_append(s); - attestation_gossip.ssz_append(s); + encoder.append(&1_u32); + + // Encode the gossip as a Vec; + encoder.append(&attestation_gossip.as_ssz_bytes()); } } + + encoder.finalize(); } } impl Decodable for PubsubMessage { - fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> { - let (id, index) = u32::ssz_decode(bytes, index)?; + fn is_ssz_fixed_len() -> bool { + false + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + let mut builder = ssz::SszDecoderBuilder::new(&bytes); + + builder.register_type::()?; + builder.register_type::>()?; + + let mut decoder = builder.build()?; + + let id: u32 = decoder.decode_next()?; + let body: Vec = decoder.decode_next()?; + match id { - 0 => { - let (block, index) = BeaconBlock::ssz_decode(bytes, index)?; - Ok((PubsubMessage::Block(block), index)) - } - 1 => { - let (attestation, index) = Attestation::ssz_decode(bytes, index)?; - Ok((PubsubMessage::Attestation(attestation), index)) - } - _ => Err(DecodeError::Invalid), + 0 => Ok(PubsubMessage::Block(BeaconBlock::from_ssz_bytes(&body)?)), + 1 => Ok(PubsubMessage::Attestation(Attestation::from_ssz_bytes( + &body, + )?)), + _ => Err(DecodeError::BytesInvalid( + "Invalid PubsubMessage id".to_string(), + )), } } } @@ -240,9 +265,7 @@ mod test { let encoded = ssz_encode(&original); - println!("{:?}", encoded); - - let (decoded, _i) = PubsubMessage::ssz_decode(&encoded, 0).unwrap(); + let decoded = PubsubMessage::from_ssz_bytes(&encoded).unwrap(); assert_eq!(original, decoded); } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 02d774d9e1..5d33f4e376 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -115,42 +115,40 @@ where } } +// NOTE! +// +// This code has not been tested, it is a placeholder until we can update to the new libp2p +// spec. fn decode(packet: Vec) -> Result { - // decode the header of the rpc - // request/response - let (request, index) = bool::ssz_decode(&packet, 0)?; - let (id, index) = RequestId::ssz_decode(&packet, index)?; - let (method_id, index) = u16::ssz_decode(&packet, index)?; + let mut builder = ssz::SszDecoderBuilder::new(&packet); + + builder.register_type::()?; + builder.register_type::()?; + builder.register_type::()?; + builder.register_type::>()?; + + let mut decoder = builder.build()?; + + let request: bool = decoder.decode_next()?; + let id: RequestId = decoder.decode_next()?; + let method_id: u16 = decoder.decode_next()?; + let bytes: Vec = decoder.decode_next()?; if request { let body = match RPCMethod::from(method_id) { - RPCMethod::Hello => { - let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?; - RPCRequest::Hello(hello_body) - } - RPCMethod::Goodbye => { - let (goodbye_reason, _index) = GoodbyeReason::ssz_decode(&packet, index)?; - RPCRequest::Goodbye(goodbye_reason) - } + RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&bytes)?), + RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&bytes)?), RPCMethod::BeaconBlockRoots => { - let (block_roots_request, _index) = - BeaconBlockRootsRequest::ssz_decode(&packet, index)?; - RPCRequest::BeaconBlockRoots(block_roots_request) + RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&bytes)?) } RPCMethod::BeaconBlockHeaders => { - let (block_headers_request, _index) = - BeaconBlockHeadersRequest::ssz_decode(&packet, index)?; - RPCRequest::BeaconBlockHeaders(block_headers_request) + RPCRequest::BeaconBlockHeaders(BeaconBlockHeadersRequest::from_ssz_bytes(&bytes)?) } RPCMethod::BeaconBlockBodies => { - let (block_bodies_request, _index) = - BeaconBlockBodiesRequest::ssz_decode(&packet, index)?; - RPCRequest::BeaconBlockBodies(block_bodies_request) + RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&bytes)?) } RPCMethod::BeaconChainState => { - let (chain_state_request, _index) = - BeaconChainStateRequest::ssz_decode(&packet, index)?; - RPCRequest::BeaconChainState(chain_state_request) + RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&bytes)?) } RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; @@ -164,29 +162,24 @@ fn decode(packet: Vec) -> Result { // we have received a response else { let result = match RPCMethod::from(method_id) { - RPCMethod::Hello => { - let (body, _index) = HelloMessage::ssz_decode(&packet, index)?; - RPCResponse::Hello(body) - } - RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"), + RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&bytes)?), RPCMethod::BeaconBlockRoots => { - let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?; - RPCResponse::BeaconBlockRoots(body) + RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&bytes)?) } RPCMethod::BeaconBlockHeaders => { - let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?; - RPCResponse::BeaconBlockHeaders(body) + RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse::from_ssz_bytes(&bytes)?) } RPCMethod::BeaconBlockBodies => { - let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?; - RPCResponse::BeaconBlockBodies(body) + RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse::from_ssz_bytes(&packet)?) } RPCMethod::BeaconChainState => { - let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?; - RPCResponse::BeaconChainState(body) + RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&packet)?) } + // We should never receive a goodbye response; it is invalid. + RPCMethod::Goodbye => return Err(DecodeError::UnknownRPCMethod), RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; + Ok(RPCEvent::Response { id, method_id, @@ -211,34 +204,50 @@ where } impl Encodable for RPCEvent { - fn ssz_append(&self, s: &mut SszStream) { + fn is_ssz_fixed_len() -> bool { + false + } + + // NOTE! + // + // This code has not been tested, it is a placeholder until we can update to the new libp2p + // spec. + fn ssz_append(&self, buf: &mut Vec) { + let offset = ::ssz_fixed_len() + + ::ssz_fixed_len() + + as Encodable>::ssz_fixed_len(); + + let mut encoder = ssz::SszEncoder::container(buf, offset); + match self { RPCEvent::Request { id, method_id, body, } => { - s.append(&true); - s.append(id); - s.append(method_id); + encoder.append(&true); + encoder.append(id); + encoder.append(method_id); + + // Encode the `body` as a `Vec`. match body { RPCRequest::Hello(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } RPCRequest::Goodbye(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } RPCRequest::BeaconBlockRoots(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } RPCRequest::BeaconBlockHeaders(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } RPCRequest::BeaconBlockBodies(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } RPCRequest::BeaconChainState(body) => { - s.append(body); + encoder.append(&body.as_ssz_bytes()); } } } @@ -247,28 +256,32 @@ impl Encodable for RPCEvent { method_id, result, } => { - s.append(&false); - s.append(id); - s.append(method_id); + encoder.append(&true); + encoder.append(id); + encoder.append(method_id); + match result { RPCResponse::Hello(response) => { - s.append(response); + encoder.append(&response.as_ssz_bytes()); } RPCResponse::BeaconBlockRoots(response) => { - s.append(response); + encoder.append(&response.as_ssz_bytes()); } RPCResponse::BeaconBlockHeaders(response) => { - s.append(response); + encoder.append(&response.as_ssz_bytes()); } RPCResponse::BeaconBlockBodies(response) => { - s.append(response); + encoder.append(&response.as_ssz_bytes()); } RPCResponse::BeaconChainState(response) => { - s.append(response); + encoder.append(&response.as_ssz_bytes()); } } } } + + // Finalize the encoder, writing to `buf`. + encoder.finalize(); } } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 30f04afbac..0b79c2cee0 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -110,8 +110,8 @@ impl AttestationService for AttestationServiceInstance { let mut resp = PublishAttestationResponse::new(); let ssz_serialized_attestation = req.get_attestation().get_ssz(); - let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { - Ok((v, _index)) => v, + let attestation = match Attestation::from_ssz_bytes(ssz_serialized_attestation) { + Ok(v) => v, Err(_) => { let log_clone = self.log.clone(); let f = sink diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 727da9542e..12e08728b6 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -35,8 +35,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { // decode the request // TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336 let _requested_slot = Slot::from(req.get_slot()); - let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) { - Ok((reveal, _index)) => reveal, + let randao_reveal = match Signature::from_ssz_bytes(req.get_randao_reveal()) { + Ok(reveal) => reveal, Err(_) => { // decode error, incorrect signature let log_clone = self.log.clone(); @@ -91,8 +91,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { let ssz_serialized_block = req.get_block().get_ssz(); - match BeaconBlock::ssz_decode(ssz_serialized_block, 0) { - Ok((block, _i)) => { + match BeaconBlock::from_ssz_bytes(ssz_serialized_block) { + Ok(block) => { match self.chain.process_block(block.clone()) { Ok(outcome) => { if outcome.sucessfully_processed() { diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index e2f8d098f6..58de39dc76 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -5,7 +5,7 @@ use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty}; use protos::services_grpc::ValidatorService; use slog::{trace, warn}; -use ssz::decode; +use ssz::Decodable; use std::sync::Arc; use types::{Epoch, EthSpec, RelativeEpoch}; @@ -74,7 +74,7 @@ impl ValidatorService for ValidatorServiceInstance { for validator_pk in validators.get_public_keys() { let mut active_validator = ActiveValidator::new(); - let public_key = match decode::(validator_pk) { + let public_key = match PublicKey::from_ssz_bytes(validator_pk) { Ok(v) => v, Err(_) => { let log_clone = self.log.clone(); diff --git a/eth2/utils/bls/src/fake_aggregate_signature.rs b/eth2/utils/bls/src/fake_aggregate_signature.rs index 0ea41b1a25..26ec75d930 100644 --- a/eth2/utils/bls/src/fake_aggregate_signature.rs +++ b/eth2/utils/bls/src/fake_aggregate_signature.rs @@ -80,7 +80,7 @@ impl<'de> Deserialize<'de> for FakeAggregateSignature { D: Deserializer<'de>, { let bytes = deserializer.deserialize_str(PrefixedHexVisitor)?; - let (obj, _) = <_>::ssz_decode(&bytes[..], 0) + let obj = <_>::from_ssz_bytes(&bytes[..]) .map_err(|e| serde::de::Error::custom(format!("invalid ssz ({:?})", e)))?; Ok(obj) } @@ -103,7 +103,7 @@ mod tests { original.add(&Signature::new(&[42, 42], 0, &keypair.sk)); let bytes = ssz_encode(&original); - let (decoded, _) = FakeAggregateSignature::ssz_decode(&bytes, 0).unwrap(); + let decoded = FakeAggregateSignature::from_ssz_bytes(&bytes).unwrap(); assert_eq!(original, decoded); } diff --git a/eth2/utils/bls/src/fake_signature.rs b/eth2/utils/bls/src/fake_signature.rs index 40347c49d0..7fb52cdbfc 100644 --- a/eth2/utils/bls/src/fake_signature.rs +++ b/eth2/utils/bls/src/fake_signature.rs @@ -75,7 +75,7 @@ impl<'de> Deserialize<'de> for FakeSignature { D: Deserializer<'de>, { let bytes = deserializer.deserialize_str(HexVisitor)?; - let (pubkey, _) = <_>::ssz_decode(&bytes[..], 0) + let pubkey = <_>::from_ssz_bytes(&bytes[..]) .map_err(|e| serde::de::Error::custom(format!("invalid ssz ({:?})", e)))?; Ok(pubkey) } @@ -94,7 +94,7 @@ mod tests { let original = FakeSignature::new(&[42, 42], 0, &keypair.sk); let bytes = ssz_encode(&original); - let (decoded, _) = FakeSignature::ssz_decode(&bytes, 0).unwrap(); + let decoded = FakeSignature::from_ssz_bytes(&bytes).unwrap(); assert_eq!(original, decoded); } diff --git a/eth2/utils/ssz/src/encode.rs b/eth2/utils/ssz/src/encode.rs index acc15bed2f..852716b0d6 100644 --- a/eth2/utils/ssz/src/encode.rs +++ b/eth2/utils/ssz/src/encode.rs @@ -25,6 +25,10 @@ pub trait Encodable { } } +/// Allow for encoding an ordered series of distinct or indistinct objects as SSZ bytes. +/// +/// **You must call `finalize(..)` after the final `append(..)` call** to ensure the bytes are +/// written to `buf`. pub struct SszEncoder<'a> { offset: usize, buf: &'a mut Vec,