Add snappy compression support (#866)

* notes from call

* should_forward function

* should_forward_block cleanup

* some cleanup and notes

* complete draft

* corrections

* some cleanup

* ran cargo fmt

* Revert "ran cargo fmt"

This reverts commit 464a5c4b62.

* ran cargo fmt after updating

* proposer index mods

* fmt

* new way of reading state

* fmt

* fmt

* compiles again

* fmt

* Correct stream timeout delay queue handling

* Correct small typo

* Support BlocksByRange step parameter

* Add initial docs to syncing

* Initial start of naive-attestation-aggregation

* Remove ping protocol

* Initial renaming of network services

* Correct rebasing relative to latest master

* Adds HashMapDelay struct to utils

* Initial network restructure

* Network restructure. Adds new types for v0.2.0

* Removes build artefacts

* Shift validation to beacon chain

* Temporarily remove gossip validation

This is to be updated to match current optimisation efforts.

* Adds AggregateAndProof

* Begin rebuilding pubsub encoding/decoding

* Temp commit

* Shift gossipsup decoding in eth2_libp2p

* Shifts block encoding/decoding into RPC

* Progress on attestation service

* Initial work on removing libp2p lock

* Add LRU caches to store (rollup)

* Update attestation validation for DB changes (WIP)

* Initial version of should_forward_block

* Scaffold

* Progress on attestation validation

Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.

* Removes lock from libp2p service

* Completed network lock removal

* Finish(?) attestation processing

* Correct network termination future

* Add slot check to block check

* Correct fmt issues

* Remove Drop implementation for network service

* Address reviewers suggestions

* Modification of validator for subscriptions

* Add slot signing to validator client

* Further progress on validation subscription

* Register SSZ snappy protocol messages

* Add initial idea of snappy compressed Codec by replacing the UVI codec with the snap library

* Fix matching against protocol string

* Adds TODOs for implementation work

* Implements check against max packet size before attempting to decode snappy compressed packages

* Add ssz_snappy codec

* Adds necessary validator subscription functionality

* Progress on snappy codec

* Clean up validator <-> beacon node http types

* Add aggregator status to ValidatorDuty

* clear buffer after decoding

* Impl Clone for manual slot clock

* Fix minor errors

* Further progress validator client subscription

* Initial subscription and aggregation handling

* Progress to modifying val client for attestation aggregation

* First draft of validator client upgrade for aggregate attestations

* Trying something

* Length prefix compressed data

* Fix gossipsub tests

* Working snappy frames with compressed length prefix

* Removes lock on a network channel

* Partially implement beacon node subscription http api

* Uncompressed length prefix working

* Cleanup

* Remove Testing request and response

* Return codec from match statement; reduce code duplication

* Fix unsafe unwrap in Outbound decode

* Add length checks

* All encode/decode functions use snappy frame format

* Add a `full` BeaconBlock method

* Add executable to test rpc against other impls

* Remove unused code; minor fixes

* Add PH & MS slot clock changes

* Account for genesis time

* Use checked mul

* Account for genesis slot

* Change API

* Refactor "duration to..." functions

* Re-merge updated block processing to v0.2.0 (#962)

* Start updating types

* WIP

* Signature hacking

* Existing EF tests passing with fake_crypto

* Updates

* Delete outdated API spec

* The refactor continues

* It compiles

* WIP test fixes

* All release tests passing bar genesis state parsing

* Update and test YamlConfig

* Update to spec v0.10 compatible BLS

* Updates to BLS EF tests

* Add EF test for AggregateVerify

And delete unused hash2curve tests for uncompressed points

* Update EF tests to v0.10.1

* Use optional block root correctly in block proc

* Use genesis fork in deposit domain. All tests pass

* Cargo fmt

* Fast aggregate verify test

* Update REST API docs

* Cargo fmt

* Fix unused import

* Bump spec tags to v0.10.1

* Add `seconds_per_eth1_block` to chainspec

* Update to timestamp based eth1 voting scheme

* Return None from `get_votes_to_consider` if block cache is empty

* Handle overflows in `is_candidate_block`

* Revert to failing tests

* Fix eth1 data sets test

* Choose default vote according to spec

* Fix collect_valid_votes tests

* Fix `get_votes_to_consider` to choose all eligible blocks

* Uncomment winning_vote tests

* Add comments; remove unused code

* Reduce seconds_per_eth1_block for simulation

* Addressed review comments

* Add test for default vote case

* Fix logs

* Remove unused functions

* Meter default eth1 votes

* Fix comments

* Address review comments; remove unused dependency

* Add first attempt at attestation proc. re-write

* Add version 2 of attestation processing

* Minor fixes

* Add validator pubkey cache

* Make get_indexed_attestation take a committee

* Link signature processing into new attn verification

* First working version

* Ensure pubkey cache is updated

* Add more metrics, slight optimizations

* Clone committee cache during attestation processing

* Update shuffling cache during block processing

* Remove old commented-out code

* Fix shuffling cache insert bug

* Used indexed attestation in fork choice

* Restructure attn processing, add metrics

* Add more detailed metrics

* Tidy, fix failing tests

* Fix failing tests, tidy

* Disable/delete two outdated tests

* Add new Pubkeys struct to signature_sets

* Refactor with functional approach

* Update beacon chain

* Remove decompressed member from pubkey bytes

* Add hashmap for indices lookup

* Add state cache, remove store cache

* Only build the head committee cache

* Change `get_attesting_indices` to use Vec

* Fix failing test

* Tidy

* Add pubkey cache persistence file

* Add more comments

* Integrate persistence file into builder

* Add pubkey cache tests

* Add data_dir to beacon chain builder

* Remove Option in pubkey cache persistence file

* Ensure consistency between datadir/data_dir

* Fix failing network test

* Tidy

* Fix todos

* Improve tests

* Fix compile error

* Fix compile error from merge

* Split up block processing metrics

* Tidy

* Refactor get_pubkey_from_state

* Remove commented-out code

* Rename state_cache -> checkpoint_cache

* Rename Checkpoint -> Snapshot

* Tidy, add comments

* Tidy up find_head function

* Change some checkpoint -> snapshot

* Add tests

* Expose max_len

* Remove dead code

* Tidy

* Fix bug

* Add sync-speed metric

* Add first attempt at VerifiableBlock

* Start integrating into beacon chain

* Integrate VerifiableBlock

* Rename VerifableBlock -> PartialBlockVerification

* Add start of typed methods

* Add progress

* Add further progress

* Rename structs

* Add full block verification to block_processing.rs

* Further beacon chain integration

* Update checks for gossip

* Add todo

* Start adding segement verification

* Add passing chain segement test

* Initial integration with batch sync

* Minor changes

* Tidy, add more error checking

* Start adding chain_segment tests

* Finish invalid signature tests

* Include single and gossip verified blocks in tests

* Add gossip verification tests

* Start adding docs

* Finish adding comments to block_processing.rs

* Rename block_processing.rs -> block_verification

* Start removing old block processing code

* Fixes beacon_chain compilation

* Fix project-wide compile errors

* Remove old code

* Fix bug with beacon proposer index

* Fix shim for BlockProcessingError

* Only process one epoch at a time

* Fix loop in chain segment processing

* Add caching for state.eth1_data_votes

* Add BeaconChain::validator_pubkey

* Revert "Add caching for state.eth1_data_votes"

This reverts commit cd73dcd643.

* Add sync-speed metric (#898)

* Add PH & MS slot clock changes

* Account for genesis time

* Use checked mul

* Account for genesis slot

* Change API

* Allow for clock disparity

* Refactor "duration to..." functions

* Ensure errors are returned during batch processing

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>

* Enr fork (#967)

* Start fixing enr-fork-id

* Fix time-until-next-fork logic

* Remove fork crate

* Update any references to beacon_chain.spec.milliseconds_per_slot to beacon_chain.slot_clock.slot_diration().as_nillis() in the attestation service. (#968)

* Process network attestations (#966)

* Start updating types

* WIP

* Signature hacking

* Existing EF tests passing with fake_crypto

* Updates

* Delete outdated API spec

* The refactor continues

* It compiles

* WIP test fixes

* All release tests passing bar genesis state parsing

* Update and test YamlConfig

* Update to spec v0.10 compatible BLS

* Updates to BLS EF tests

* Add EF test for AggregateVerify

And delete unused hash2curve tests for uncompressed points

* Update EF tests to v0.10.1

* Use optional block root correctly in block proc

* Use genesis fork in deposit domain. All tests pass

* Cargo fmt

* Fast aggregate verify test

* Update REST API docs

* Cargo fmt

* Fix unused import

* Bump spec tags to v0.10.1

* Add `seconds_per_eth1_block` to chainspec

* Update to timestamp based eth1 voting scheme

* Return None from `get_votes_to_consider` if block cache is empty

* Handle overflows in `is_candidate_block`

* Revert to failing tests

* Fix eth1 data sets test

* Choose default vote according to spec

* Fix collect_valid_votes tests

* Fix `get_votes_to_consider` to choose all eligible blocks

* Uncomment winning_vote tests

* Add comments; remove unused code

* Reduce seconds_per_eth1_block for simulation

* Addressed review comments

* Add test for default vote case

* Fix logs

* Remove unused functions

* Meter default eth1 votes

* Fix comments

* Address review comments; remove unused dependency

* Add first attempt at attestation proc. re-write

* Add version 2 of attestation processing

* Minor fixes

* Add validator pubkey cache

* Make get_indexed_attestation take a committee

* Link signature processing into new attn verification

* First working version

* Ensure pubkey cache is updated

* Add more metrics, slight optimizations

* Clone committee cache during attestation processing

* Update shuffling cache during block processing

* Remove old commented-out code

* Fix shuffling cache insert bug

* Used indexed attestation in fork choice

* Restructure attn processing, add metrics

* Add more detailed metrics

* Tidy, fix failing tests

* Fix failing tests, tidy

* Disable/delete two outdated tests

* Add new Pubkeys struct to signature_sets

* Refactor with functional approach

* Update beacon chain

* Remove decompressed member from pubkey bytes

* Add hashmap for indices lookup

* Add state cache, remove store cache

* Only build the head committee cache

* Change `get_attesting_indices` to use Vec

* Fix failing test

* Tidy

* Add pubkey cache persistence file

* Add more comments

* Integrate persistence file into builder

* Add pubkey cache tests

* Add data_dir to beacon chain builder

* Remove Option in pubkey cache persistence file

* Ensure consistency between datadir/data_dir

* Fix failing network test

* Tidy

* Fix todos

* Improve tests

* Fix compile error

* Fix compile error from merge

* Split up block processing metrics

* Tidy

* Refactor get_pubkey_from_state

* Remove commented-out code

* Rename state_cache -> checkpoint_cache

* Rename Checkpoint -> Snapshot

* Tidy, add comments

* Tidy up find_head function

* Change some checkpoint -> snapshot

* Add tests

* Expose max_len

* Remove dead code

* Tidy

* Fix bug

* Add sync-speed metric

* Add first attempt at VerifiableBlock

* Start integrating into beacon chain

* Integrate VerifiableBlock

* Rename VerifableBlock -> PartialBlockVerification

* Add start of typed methods

* Add progress

* Add further progress

* Rename structs

* Add full block verification to block_processing.rs

* Further beacon chain integration

* Update checks for gossip

* Add todo

* Start adding segement verification

* Add passing chain segement test

* Initial integration with batch sync

* Minor changes

* Tidy, add more error checking

* Start adding chain_segment tests

* Finish invalid signature tests

* Include single and gossip verified blocks in tests

* Add gossip verification tests

* Start adding docs

* Finish adding comments to block_processing.rs

* Rename block_processing.rs -> block_verification

* Start removing old block processing code

* Fixes beacon_chain compilation

* Fix project-wide compile errors

* Remove old code

* Fix bug with beacon proposer index

* Fix shim for BlockProcessingError

* Only process one epoch at a time

* Fix loop in chain segment processing

* Add caching for state.eth1_data_votes

* Add BeaconChain::validator_pubkey

* Revert "Add caching for state.eth1_data_votes"

This reverts commit cd73dcd643.

* Allow for clock disparity

* Ensure errors are returned during batch processing

* Add block gossip verification

* Connect attestation processing to beacon chain

* Optimistically subscribe to subnets on the same slot

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>

* Update /validator/subscribe (#969)

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Remove unused function

* Upgrade sim (#972)

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Add the no-eth1-sim, refactor sim

* Sends discovery for persistent subnets (#973)

* main takes cmdline arguments

* Add test script

* Fix errors

* snappy uses Uvi to encode/decode length prefix

* Add more comments

* Run fmt

Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Age Manning <Age@AgeManning.com>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: b-m-f <max@ehlers.berlin>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
Co-authored-by: realbigsean <seananderson33@GMAIL.com>
This commit is contained in:
Pawan Dhananjay
2020-04-16 07:12:51 +05:30
committed by GitHub
parent 040628bf3e
commit 6a21c9ba6f
6 changed files with 708 additions and 43 deletions

View File

@@ -1,8 +1,10 @@
pub(crate) mod base;
pub(crate) mod ssz;
pub(crate) mod ssz_snappy;
use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz::{SSZInboundCodec, SSZOutboundCodec};
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCErrorResponse, RPCRequest};
use libp2p::bytes::BytesMut;
@@ -11,10 +13,12 @@ use types::EthSpec;
// Known types of codecs
pub enum InboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec>),
SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec>),
}
pub enum OutboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec>),
SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec>),
}
@@ -25,6 +29,7 @@ impl<T: EthSpec> Encoder for InboundCodec<T> {
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.encode(item, dst),
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
@@ -36,6 +41,7 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.decode(src),
InboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}
@@ -47,6 +53,7 @@ impl<TSpec: EthSpec> Encoder for OutboundCodec<TSpec> {
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.encode(item, dst),
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
@@ -58,6 +65,7 @@ impl<T: EthSpec> Decoder for OutboundCodec<T> {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.decode(src),
OutboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}

View File

@@ -0,0 +1,373 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{
ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_META_DATA,
RPC_PING, RPC_STATUS,
},
};
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
use ssz::{Decode, Encode};
use std::io::Cursor;
use std::io::ErrorKind;
use std::io::{Read, Write};
use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::Uvi;
/* Inbound Codec */
pub struct SSZSnappyInboundCodec<TSpec: EthSpec> {
protocol: ProtocolId,
inner: Uvi<usize>,
len: Option<usize>,
/// Maximum bytes that can be sent in one req/resp chunked responses.
max_packet_size: usize,
phantom: PhantomData<TSpec>,
}
impl<T: EthSpec> SSZSnappyInboundCodec<T> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy.
debug_assert!(protocol.encoding.as_str() == "ssz_snappy");
SSZSnappyInboundCodec {
inner: uvi_codec,
protocol,
len: None,
phantom: PhantomData,
max_packet_size,
}
}
}
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCErrorResponse<TSpec>;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCErrorResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
},
RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(),
RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(),
RPCErrorResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::Custom(
"attempting to encode data > max_packet_size".into(),
));
}
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}
// Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::Custom(
"attempting to decode data > max_packet_size".into(),
));
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
src.split_to(n as usize);
match self.protocol.message_name.as_str() {
RPC_STATUS => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_GOODBYE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
&decoded_buffer,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&decoded_buffer)?,
}))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_PING => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes(
&decoded_buffer,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_META_DATA => match self.protocol.version.as_str() {
"1" => {
if decoded_buffer.len() > 0 {
Err(RPCError::Custom(
"Get metadata request should be empty".into(),
))
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))
}
}
_ => unreachable!("Cannot negotiate an unknown version"),
},
_ => unreachable!("Cannot negotiate an unknown protocol"),
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}
/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZSnappyOutboundCodec<TSpec: EthSpec> {
inner: Uvi<usize>,
len: Option<usize>,
protocol: ProtocolId,
/// Maximum bytes that can be sent in one req/resp chunked responses.
max_packet_size: usize,
phantom: PhantomData<TSpec>,
}
impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy.
debug_assert!(protocol.encoding.as_str() == "ssz_snappy");
SSZSnappyOutboundCodec {
inner: uvi_codec,
protocol,
max_packet_size,
len: None,
phantom: PhantomData,
}
}
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder for SSZSnappyOutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::Custom(
"attempting to encode data > max_packet_size".into(),
));
}
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}
// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
type Item = RPCResponse<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length as usize);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::Custom(
"attempting to decode data > max_packet_size".into(),
));
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
src.split_to(n as usize);
match self.protocol.message_name.as_str() {
RPC_STATUS => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_GOODBYE => {
Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response"))
}
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_PING => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_META_DATA => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
&decoded_buffer,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
_ => unreachable!("Cannot negotiate an unknown protocol"),
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}
impl<TSpec: EthSpec> OutboundCodec for SSZSnappyOutboundCodec<TSpec> {
type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length as usize);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::Custom(
"attempting to decode data > max_packet_size".into(),
));
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
src.split_to(n as usize);
Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?))
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}