From e57aea924acf5cbabdcea18895ac07e38a425ed7 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 11 May 2020 17:53:53 +0530 Subject: [PATCH] Update tokio::codec to futures_codec (#1128) --- Cargo.lock | 21 +++++++++--- beacon_node/eth2-libp2p/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/rpc/codec/base.rs | 34 ++++++++++--------- beacon_node/eth2-libp2p/src/rpc/codec/mod.rs | 12 ++++--- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 14 ++++---- .../eth2-libp2p/src/rpc/codec/ssz_snappy.rs | 14 ++++---- 6 files changed, 58 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0240824416..6c27f7e41a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1239,6 +1239,7 @@ dependencies = [ "eth2_ssz_types", "fnv", "futures 0.3.4", + "futures_codec 0.4.0", "hashset_delay", "hex 0.4.2", "lazy_static", @@ -1259,7 +1260,6 @@ dependencies = [ "tiny-keccak 2.0.2", "tokio 0.2.20", "tokio-io-timeout", - "tokio-util", "types", "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", "version", @@ -1611,6 +1611,18 @@ dependencies = [ "pin-project", ] +[[package]] +name = "futures_codec" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe8859feb7140742ed1a2a85a07941100ad2b5f98a421b353931d718a34144d1" +dependencies = [ + "bytes 0.5.4", + "futures 0.3.4", + "memchr", + "pin-project", +] + [[package]] name = "gcc" version = "0.3.55" @@ -2302,7 +2314,7 @@ dependencies = [ "bytes 0.5.4", "fnv", "futures 0.3.4", - "futures_codec", + "futures_codec 0.3.4", "libp2p-core", "libp2p-swarm", "log 0.4.8", @@ -2341,7 +2353,7 @@ dependencies = [ "bytes 0.5.4", "fnv", "futures 0.3.4", - "futures_codec", + "futures_codec 0.3.4", "libp2p-core", "log 0.4.8", "parking_lot 0.10.2", @@ -4770,7 +4782,6 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.4", "futures-core", - "futures-io", "futures-sink", "log 0.4.8", "pin-project-lite", @@ -4990,7 +5001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f67332660eb59a6f1eb24ff1220c9e8d01738a8503c6002e30bcfe4bd9f2b4a9" dependencies = [ "bytes 0.5.4", - "futures_codec", + "futures_codec 0.3.4", ] [[package]] diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index a468e373f3..6ba255b395 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -31,9 +31,9 @@ base64 = "0.12.0" snap = "1.0.0" void = "1.0.2" tokio-io-timeout = "0.4.0" -tokio-util = { version = "0.3.1", features = ["codec", "compat"] } discv5 = "0.1.0-alpha.1" tiny-keccak = "2.0.2" +futures_codec = "0.4.0" [dependencies.libp2p] version = "0.18.1" diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 8f9278a316..70cab5b68f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -4,10 +4,10 @@ use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; +use futures_codec::{Decoder, Encoder}; use types::EthSpec; -pub trait OutboundCodec: Encoder + Decoder { +pub trait OutboundCodec: Encoder + Decoder { type ErrorType; fn decode_error( @@ -21,7 +21,7 @@ pub trait OutboundCodec: Encoder + Decoder { pub struct BaseInboundCodec where - TCodec: Encoder> + Decoder, + TCodec: Encoder + Decoder, TSpec: EthSpec, { /// Inner codec for handling various encodings @@ -31,7 +31,7 @@ where impl BaseInboundCodec where - TCodec: Encoder> + Decoder, + TCodec: Encoder + Decoder, TSpec: EthSpec, { pub fn new(codec: TCodec) -> Self { @@ -46,7 +46,7 @@ where // This deals with Decoding RPC Responses from other peers and encoding our requests pub struct BaseOutboundCodec where - TOutboundCodec: OutboundCodec>, + TOutboundCodec: OutboundCodec, TSpec: EthSpec, { /// Inner codec for handling various encodings. @@ -59,7 +59,7 @@ where impl BaseOutboundCodec where TSpec: EthSpec, - TOutboundCodec: OutboundCodec>, + TOutboundCodec: OutboundCodec, { pub fn new(codec: TOutboundCodec) -> Self { BaseOutboundCodec { @@ -75,16 +75,17 @@ where /* Base Inbound Codec */ // This Encodes RPC Responses sent to external peers -impl Encoder> for BaseInboundCodec +impl Encoder for BaseInboundCodec where TSpec: EthSpec, - TCodec: Decoder + Encoder>, + TCodec: Decoder + Encoder, { - type Error = >>::Error; + type Item = RPCCodedResponse; + type Error = ::Error; fn encode( &mut self, - item: RPCCodedResponse, + item: Self::Item, dst: &mut BytesMut, ) -> Result<(), Self::Error> { dst.clear(); @@ -101,7 +102,7 @@ where impl Decoder for BaseInboundCodec where TSpec: EthSpec, - TCodec: Encoder> + Decoder>, + TCodec: Encoder + Decoder>, { type Item = RPCRequest; type Error = ::Error; @@ -114,14 +115,15 @@ where /* Base Outbound Codec */ // This Encodes RPC Requests sent to external peers -impl Encoder> for BaseOutboundCodec +impl Encoder for BaseOutboundCodec where TSpec: EthSpec, - TCodec: OutboundCodec> + Encoder>, + TCodec: OutboundCodec + Encoder>, { - type Error = >>::Error; + type Item = RPCRequest; + type Error = ::Error; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { self.inner.encode(item, dst) } } @@ -130,7 +132,7 @@ where impl Decoder for BaseOutboundCodec where TSpec: EthSpec, - TCodec: OutboundCodec, ErrorType = ErrorMessage> + TCodec: OutboundCodec + Decoder>, { type Item = RPCCodedResponse; diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index c117f52feb..c4aed8eae5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -8,7 +8,7 @@ use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; use crate::rpc::{RPCCodedResponse, RPCRequest}; use libp2p::bytes::BytesMut; -use tokio_util::codec::{Decoder, Encoder}; +use futures_codec::{Encoder, Decoder}; use types::EthSpec; // Known types of codecs @@ -22,10 +22,11 @@ pub enum OutboundCodec { SSZ(BaseOutboundCodec, TSpec>), } -impl Encoder> for InboundCodec { +impl Encoder for InboundCodec { + type Item = RPCCodedResponse; type Error = RPCError; - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { InboundCodec::SSZ(codec) => codec.encode(item, dst), InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), @@ -45,10 +46,11 @@ impl Decoder for InboundCodec { } } -impl Encoder> for OutboundCodec { +impl Encoder for OutboundCodec { + type Item = RPCRequest; type Error = RPCError; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { OutboundCodec::SSZ(codec) => codec.encode(item, dst), OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 0f763e997d..f5b16d56f4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -7,7 +7,7 @@ use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::{BufMut, Bytes, BytesMut}; use ssz::{Decode, Encode}; use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; +use futures_codec::{Decoder, Encoder}; use types::{EthSpec, SignedBeaconBlock}; use unsigned_varint::codec::UviBytes; @@ -36,12 +36,13 @@ impl SSZInboundCodec { } // Encoder for inbound streams: Encodes RPC Responses sent to peers. -impl Encoder> for SSZInboundCodec { +impl Encoder for SSZInboundCodec { + type Item = RPCCodedResponse; type Error = RPCError; fn encode( &mut self, - item: RPCCodedResponse, + item: Self::Item, dst: &mut BytesMut, ) -> Result<(), Self::Error> { let bytes = match item { @@ -148,10 +149,11 @@ impl SSZOutboundCodec { } // Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder> for SSZOutboundCodec { +impl Encoder for SSZOutboundCodec { + type Item = RPCRequest; type Error = RPCError; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { RPCRequest::Status(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(), @@ -241,7 +243,7 @@ impl Decoder for SSZOutboundCodec { } } -impl OutboundCodec> for SSZOutboundCodec { +impl OutboundCodec for SSZOutboundCodec { type ErrorType = ErrorMessage; fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index 6c6e09f4db..1971853447 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -12,7 +12,7 @@ use std::io::Cursor; use std::io::ErrorKind; use std::io::{Read, Write}; use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; +use futures_codec::{Decoder, Encoder}; use types::{EthSpec, SignedBeaconBlock}; use unsigned_varint::codec::Uvi; @@ -44,12 +44,13 @@ impl SSZSnappyInboundCodec { } // Encoder for inbound streams: Encodes RPC Responses sent to peers. -impl Encoder> for SSZSnappyInboundCodec { +impl Encoder for SSZSnappyInboundCodec { + type Item = RPCCodedResponse; type Error = RPCError; fn encode( &mut self, - item: RPCCodedResponse, + item: Self::Item, dst: &mut BytesMut, ) -> Result<(), Self::Error> { let bytes = match item { @@ -196,10 +197,11 @@ impl SSZSnappyOutboundCodec { } // Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder> for SSZSnappyOutboundCodec { +impl Encoder for SSZSnappyOutboundCodec { + type Item = RPCRequest; type Error = RPCError; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { RPCRequest::Status(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(), @@ -309,7 +311,7 @@ impl Decoder for SSZSnappyOutboundCodec { } } -impl OutboundCodec> for SSZSnappyOutboundCodec { +impl OutboundCodec for SSZSnappyOutboundCodec { type ErrorType = ErrorMessage; fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> {