From d5a2de759bc089d122a95e17eccb0e4eebef752a Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Fri, 25 Nov 2022 05:19:00 +0000 Subject: [PATCH] Added LightClientBootstrap V1 (#3711) ## Issue Addressed Partially addresses #3651 ## Proposed Changes Adds server-side support for light_client_bootstrap_v1 topic ## Additional Info This PR, creates each time a bootstrap without using cache, I do not know how necessary a cache is in this case as this topic is not supposed to be called frequently and IMHO we can just prevent abuse by using the limiter, but let me know what you think or if there is any caveat to this, or if it is necessary only for the sake of good practice. Co-authored-by: Pawan Dhananjay --- beacon_node/beacon_chain/src/chain_config.rs | 3 - beacon_node/lighthouse_network/src/config.rs | 4 + .../src/peer_manager/mod.rs | 3 + .../src/rpc/codec/ssz_snappy.rs | 18 ++++- .../lighthouse_network/src/rpc/methods.rs | 18 ++++- beacon_node/lighthouse_network/src/rpc/mod.rs | 14 +++- .../lighthouse_network/src/rpc/outbound.rs | 12 ++- .../lighthouse_network/src/rpc/protocol.rs | 76 +++++++------------ .../src/rpc/rate_limiter.rs | 11 +++ .../src/service/api_types.rs | 14 +++- .../lighthouse_network/src/service/mod.rs | 21 ++++- .../network/src/beacon_processor/mod.rs | 45 ++++++++++- .../beacon_processor/worker/rpc_methods.rs | 75 +++++++++++++++++- beacon_node/network/src/router/mod.rs | 4 + beacon_node/network/src/router/processor.rs | 12 +++ beacon_node/src/config.rs | 6 +- lighthouse/tests/beacon_node.rs | 4 +- 17 files changed, 271 insertions(+), 69 deletions(-) diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index f970c5607e..286cc17a96 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -47,8 +47,6 @@ pub struct ChainConfig { pub count_unrealized_full: CountUnrealizedFull, /// Optionally set timeout for calls to checkpoint sync endpoint. pub checkpoint_sync_url_timeout: u64, - /// Whether to enable the light client server protocol. - pub enable_light_client_server: bool, } impl Default for ChainConfig { @@ -70,7 +68,6 @@ impl Default for ChainConfig { paranoid_block_proposal: false, count_unrealized_full: CountUnrealizedFull::default(), checkpoint_sync_url_timeout: 60, - enable_light_client_server: false, } } } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 71566b8778..c8ef8809d4 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -130,6 +130,9 @@ pub struct Config { /// Whether metrics are enabled. pub metrics_enabled: bool, + + /// Whether light client protocols should be enabled. + pub enable_light_client_server: bool, } impl Default for Config { @@ -207,6 +210,7 @@ impl Default for Config { shutdown_after_sync: false, topics: Vec::new(), metrics_enabled: false, + enable_light_client_server: false, } } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 0f29135956..8102fa82a0 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -501,6 +501,7 @@ impl PeerManager { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::LightClientBootstrap => PeerAction::LowToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -517,6 +518,7 @@ impl PeerManager { Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, Protocol::Goodbye => return, + Protocol::LightClientBootstrap => return, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, } @@ -531,6 +533,7 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::LightClientBootstrap => return, Protocol::Goodbye => return, Protocol::MetaData => return, Protocol::Status => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index a46a05a8ce..a4dd602b3f 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -16,8 +16,8 @@ use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; use types::{ - EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockMerge, + light_client_bootstrap::LightClientBootstrap, EthSpec, ForkContext, ForkName, Hash256, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockMerge, }; use unsigned_varint::codec::Uvi; @@ -70,6 +70,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => // Encode the correct version of the MetaData response based on the negotiated version. @@ -230,6 +231,7 @@ impl Encoder> for SSZSnappyOutboundCodec< OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode + OutboundRequest::LightClientBootstrap(req) => req.as_ssz_bytes(), }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { @@ -472,7 +474,11 @@ fn handle_v1_request( Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), - + Protocol::LightClientBootstrap => Ok(Some(InboundRequest::LightClientBootstrap( + LightClientBootstrapRequest { + root: Hash256::from_ssz_bytes(decoded_buffer)?, + }, + ))), // MetaData requests return early from InboundUpgrade and do not reach the decoder. // Handle this case just for completeness. Protocol::MetaData => { @@ -544,6 +550,9 @@ fn handle_v1_response( Protocol::MetaData => Ok(Some(RPCResponse::MetaData(MetaData::V1( MetaDataV1::from_ssz_bytes(decoded_buffer)?, )))), + Protocol::LightClientBootstrap => Ok(Some(RPCResponse::LightClientBootstrap( + LightClientBootstrap::from_ssz_bytes(decoded_buffer)?, + ))), } } @@ -867,6 +876,9 @@ mod tests { OutboundRequest::MetaData(metadata) => { assert_eq!(decoded, InboundRequest::MetaData(metadata)) } + OutboundRequest::LightClientBootstrap(bootstrap) => { + assert_eq!(decoded, InboundRequest::LightClientBootstrap(bootstrap)) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 26d755a6e0..5da595c3db 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -12,7 +12,9 @@ use std::ops::Deref; use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; /// Maximum number of blocks in a single request. pub type MaxRequestBlocks = U1024; @@ -243,6 +245,9 @@ pub enum RPCResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get LIGHTCLIENT_BOOTSTRAP request. + LightClientBootstrap(LightClientBootstrap), + /// A PONG response to a PING request. Pong(Ping), @@ -273,6 +278,12 @@ pub enum RPCCodedResponse { StreamTermination(ResponseTermination), } +/// Request a light_client_bootstrap for lightclients peers. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct LightClientBootstrapRequest { + pub root: Hash256, +} + /// The code assigned to an erroneous `RPCResponse`. #[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr)] #[strum(serialize_all = "snake_case")] @@ -321,6 +332,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, + RPCResponse::LightClientBootstrap(_) => false, }, RPCCodedResponse::Error(_, _) => true, // Stream terminations are part of responses that have chunks @@ -355,6 +367,7 @@ impl RPCResponse { RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, + RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, } } } @@ -390,6 +403,9 @@ impl std::fmt::Display for RPCResponse { } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), + RPCResponse::LightClientBootstrap(bootstrap) => { + write!(f, "LightClientBootstrap Slot: {}", bootstrap.header.slot) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 7b0092ef71..203a642a8b 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -26,8 +26,8 @@ pub(crate) use protocol::{InboundRequest, RPCProtocol}; pub use handler::SubstreamId; pub use methods::{ - BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks, - RPCResponseErrorCode, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, + BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, + MaxRequestBlocks, RPCResponseErrorCode, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub(crate) use outbound::OutboundRequest; pub use protocol::{max_rpc_size, Protocol, RPCError}; @@ -108,18 +108,24 @@ pub struct RPC { /// Queue of events to be processed. events: Vec, RPCHandler>>, fork_context: Arc, + enable_light_client_server: bool, /// Slog logger for RPC behaviour. log: slog::Logger, } impl RPC { - pub fn new(fork_context: Arc, log: slog::Logger) -> Self { + pub fn new( + fork_context: Arc, + enable_light_client_server: bool, + log: slog::Logger, + ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); let limiter = RPCRateLimiterBuilder::new() .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) .n_every(Protocol::Ping, 2, Duration::from_secs(10)) .n_every(Protocol::Status, 5, Duration::from_secs(15)) .one_every(Protocol::Goodbye, Duration::from_secs(10)) + .one_every(Protocol::LightClientBootstrap, Duration::from_secs(10)) .n_every( Protocol::BlocksByRange, methods::MAX_REQUEST_BLOCKS, @@ -132,6 +138,7 @@ impl RPC { limiter, events: Vec::new(), fork_context, + enable_light_client_server, log, } } @@ -188,6 +195,7 @@ where RPCProtocol { fork_context: self.fork_context.clone(), max_rpc_size: max_rpc_size(&self.fork_context), + enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, }, (), diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 7d5acc4364..774303800e 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -38,6 +38,7 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + LightClientBootstrap(LightClientBootstrapRequest), Ping(Ping), MetaData(PhantomData), } @@ -84,9 +85,12 @@ impl OutboundRequest { ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), ], + // Note: This match arm is technically unreachable as we only respond to light client requests + // that we generate from the beacon state. + // We do not make light client rpc requests from the beacon node + OutboundRequest::LightClientBootstrap(_) => vec![], } } - /* These functions are used in the handler for stream management */ /// Number of responses expected for this request. @@ -98,6 +102,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, + OutboundRequest::LightClientBootstrap(_) => 1, } } @@ -110,6 +115,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::MetaData(_) => Protocol::MetaData, + OutboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap, } } @@ -121,6 +127,7 @@ impl OutboundRequest { // variants that have `multiple_responses()` can have values. OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + OutboundRequest::LightClientBootstrap(_) => unreachable!(), OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -178,6 +185,9 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), + OutboundRequest::LightClientBootstrap(bootstrap) => { + write!(f, "Lightclient Bootstrap: {}", bootstrap.root) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 81960214b1..1f40f81971 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -153,6 +153,8 @@ pub enum Protocol { Ping, /// The `MetaData` protocol name. MetaData, + /// The `LightClientBootstrap` protocol name. + LightClientBootstrap, } /// RPC Versions @@ -179,6 +181,7 @@ impl std::fmt::Display for Protocol { Protocol::BlocksByRoot => "beacon_blocks_by_root", Protocol::Ping => "ping", Protocol::MetaData => "metadata", + Protocol::LightClientBootstrap => "light_client_bootstrap", }; f.write_str(repr) } @@ -207,6 +210,7 @@ impl std::fmt::Display for Version { pub struct RPCProtocol { pub fork_context: Arc, pub max_rpc_size: usize, + pub enable_light_client_server: bool, pub phantom: PhantomData, } @@ -216,7 +220,7 @@ impl UpgradeInfo for RPCProtocol { /// The list of supported RPC protocols for Lighthouse. fn protocol_info(&self) -> Self::InfoIter { - vec![ + let mut supported_protocols = vec![ ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), // V2 variants have higher preference then V1 @@ -227,7 +231,15 @@ impl UpgradeInfo for RPCProtocol { ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), - ] + ]; + if self.enable_light_client_server { + supported_protocols.push(ProtocolId::new( + Protocol::LightClientBootstrap, + Version::V1, + Encoding::SSZSnappy, + )); + } + supported_protocols } } @@ -289,6 +301,10 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), + Protocol::LightClientBootstrap => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::MetaData => RpcLimits::new(0, 0), // Metadata requests are empty } } @@ -312,6 +328,10 @@ impl ProtocolId { as Encode>::ssz_fixed_len(), as Encode>::ssz_fixed_len(), ), + Protocol::LightClientBootstrap => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), } } @@ -417,57 +437,13 @@ pub enum InboundRequest { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + LightClientBootstrap(LightClientBootstrapRequest), Ping(Ping), MetaData(PhantomData), } -impl UpgradeInfo for InboundRequest { - type Info = ProtocolId; - type InfoIter = Vec; - - // add further protocols as we support more encodings/versions - fn protocol_info(&self) -> Self::InfoIter { - self.supported_protocols() - } -} - /// Implements the encoding per supported protocol for `RPCRequest`. impl InboundRequest { - pub fn supported_protocols(&self) -> Vec { - match self { - // add more protocols when versions/encodings are supported - InboundRequest::Status(_) => vec![ProtocolId::new( - Protocol::Status, - Version::V1, - Encoding::SSZSnappy, - )], - InboundRequest::Goodbye(_) => vec![ProtocolId::new( - Protocol::Goodbye, - Version::V1, - Encoding::SSZSnappy, - )], - InboundRequest::BlocksByRange(_) => vec![ - // V2 has higher preference when negotiating a stream - ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), - ], - InboundRequest::BlocksByRoot(_) => vec![ - // V2 has higher preference when negotiating a stream - ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), - ], - InboundRequest::Ping(_) => vec![ProtocolId::new( - Protocol::Ping, - Version::V1, - Encoding::SSZSnappy, - )], - InboundRequest::MetaData(_) => vec![ - ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), - ], - } - } - /* These functions are used in the handler for stream management */ /// Number of responses expected for this request. @@ -479,6 +455,7 @@ impl InboundRequest { InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, + InboundRequest::LightClientBootstrap(_) => 1, } } @@ -491,6 +468,7 @@ impl InboundRequest { InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::MetaData(_) => Protocol::MetaData, + InboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap, } } @@ -506,6 +484,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), InboundRequest::MetaData(_) => unreachable!(), + InboundRequest::LightClientBootstrap(_) => unreachable!(), } } } @@ -609,6 +588,9 @@ impl std::fmt::Display for InboundRequest { InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), + InboundRequest::LightClientBootstrap(bootstrap) => { + write!(f, "LightClientBootstrap: {}", bootstrap.root) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 70b14c33de..6ba9f6e941 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -73,6 +73,8 @@ pub struct RPCRateLimiter { bbrange_rl: Limiter, /// BlocksByRoot rate limiter. bbroots_rl: Limiter, + /// LightClientBootstrap rate limiter. + lcbootstrap_rl: Limiter, } /// Error type for non conformant requests @@ -98,6 +100,8 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the LightClientBootstrap protocol. + lcbootstrap_quota: Option, } impl RPCRateLimiterBuilder { @@ -116,6 +120,7 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, } self } @@ -155,6 +160,9 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let lcbootstrap_quote = self + .lcbootstrap_quota + .ok_or("LightClientBootstrap quota not specified")?; // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; @@ -163,6 +171,7 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let lcbootstrap_rl = Limiter::from_quota(lcbootstrap_quote)?; // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); @@ -176,6 +185,7 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + lcbootstrap_rl, init_time: Instant::now(), }) } @@ -199,6 +209,7 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::LightClientBootstrap => &mut self.lcbootstrap_rl, }; check(limiter) } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index e5d81737cf..849a86f51b 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,12 +1,12 @@ use std::sync::Arc; use libp2p::core::connection::ConnectionId; -use types::{EthSpec, SignedBeaconBlock}; +use types::{light_client_bootstrap::LightClientBootstrap, EthSpec, SignedBeaconBlock}; use crate::rpc::{ methods::{ - BlocksByRangeRequest, BlocksByRootRequest, OldBlocksByRangeRequest, RPCCodedResponse, - RPCResponse, ResponseTermination, StatusMessage, + BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, + OldBlocksByRangeRequest, RPCCodedResponse, RPCResponse, ResponseTermination, StatusMessage, }, OutboundRequest, SubstreamId, }; @@ -34,6 +34,8 @@ pub enum Request { BlocksByRange(BlocksByRangeRequest), /// A request blocks root request. BlocksByRoot(BlocksByRootRequest), + // light client bootstrap request + LightClientBootstrap(LightClientBootstrapRequest), } impl std::convert::From for OutboundRequest { @@ -47,6 +49,7 @@ impl std::convert::From for OutboundRequest { step: 1, }) } + Request::LightClientBootstrap(b) => OutboundRequest::LightClientBootstrap(b), Request::Status(s) => OutboundRequest::Status(s), } } @@ -66,6 +69,8 @@ pub enum Response { BlocksByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a LightClientUpdate request. + LightClientBootstrap(LightClientBootstrap), } impl std::convert::From> for RPCCodedResponse { @@ -80,6 +85,9 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), + Response::LightClientBootstrap(b) => { + RPCCodedResponse::Success(RPCResponse::LightClientBootstrap(b)) + } } } } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 97d96d171d..a6f1ce20ad 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -259,7 +259,11 @@ impl Network { (gossipsub, update_gossipsub_scores) }; - let eth2_rpc = RPC::new(ctx.fork_context.clone(), log.clone()); + let eth2_rpc = RPC::new( + ctx.fork_context.clone(), + config.enable_light_client_server, + log.clone(), + ); let discovery = { // Build and start the discovery sub-behaviour @@ -978,6 +982,9 @@ impl Network { Request::Status(_) => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["status"]) } + Request::LightClientBootstrap(_) => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["light_client_bootstrap"]) + } Request::BlocksByRange { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_range"]) } @@ -1247,6 +1254,14 @@ impl Network { ); Some(event) } + InboundRequest::LightClientBootstrap(req) => { + let event = self.build_request( + peer_request_id, + peer_id, + Request::LightClientBootstrap(req), + ); + Some(event) + } } } Ok(RPCReceived::Response(id, resp)) => { @@ -1274,6 +1289,10 @@ impl Network { RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + // Should never be reached + RPCResponse::LightClientBootstrap(bootstrap) => { + self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap)) + } } } Ok(RPCReceived::EndOfStream(id, termination)) => { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index f477878ac0..aa4286b9cd 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -45,6 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; +use lighthouse_network::rpc::LightClientBootstrapRequest; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -156,6 +157,10 @@ const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -195,6 +200,7 @@ pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; +pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; @@ -557,6 +563,22 @@ impl WorkEvent { } } + /// Create a new work event to process `LightClientBootstrap`s from the RPC network. + pub fn lightclient_bootstrap_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: LightClientBootstrapRequest, + ) -> Self { + Self { + drop_during_sync: true, + work: Work::LightClientBootstrapRequest { + peer_id, + request_id, + request, + }, + } + } + /// Get a `str` representation of the type of work this `WorkEvent` contains. pub fn work_type(&self) -> &'static str { self.work.str_id() @@ -733,6 +755,11 @@ pub enum Work { request_id: PeerRequestId, request: BlocksByRootRequest, }, + LightClientBootstrapRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: LightClientBootstrapRequest, + }, } impl Work { @@ -755,6 +782,7 @@ impl Work { Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, + Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, } @@ -898,7 +926,7 @@ impl BeaconProcessor { let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); - + let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); @@ -1137,6 +1165,8 @@ impl BeaconProcessor { } else if let Some(item) = backfill_chain_segment.pop() { self.spawn_worker(item, toolbox); // This statement should always be the final else statement. + } else if let Some(item) = lcbootstrap_queue.pop() { + self.spawn_worker(item, toolbox); } else { // Let the journal know that a worker is freed and there's nothing else // for it to do. @@ -1237,6 +1267,9 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { bbroots_queue.push(work, work_id, &self.log) } + Work::LightClientBootstrapRequest { .. } => { + lcbootstrap_queue.push(work, work_id, &self.log) + } Work::UnknownBlockAttestation { .. } => { unknown_block_attestation_queue.push(work) } @@ -1594,6 +1627,16 @@ impl BeaconProcessor { request, ) }), + /* + * Processing of lightclient bootstrap requests from other peers. + */ + Work::LightClientBootstrapRequest { + peer_id, + request_id, + request, + } => task_spawner.spawn_blocking(move || { + worker.handle_light_client_bootstrap(peer_id, request_id, request) + }), Work::UnknownBlockAttestation { message_id, peer_id, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 37aee01716..3e354a70d2 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -11,7 +11,7 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -204,6 +204,79 @@ impl Worker { ) } + /// Handle a `BlocksByRoot` request from the peer. + pub fn handle_light_client_bootstrap( + self, + peer_id: PeerId, + request_id: PeerRequestId, + request: LightClientBootstrapRequest, + ) { + let block_root = request.root; + let state_root = match self.chain.get_blinded_block(&block_root) { + Ok(signed_block) => match signed_block { + Some(signed_block) => signed_block.state_root(), + None => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not avaiable".into(), + request_id, + ); + return; + } + }, + Err(_) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not avaiable".into(), + request_id, + ); + return; + } + }; + let mut beacon_state = match self.chain.get_state(&state_root, None) { + Ok(beacon_state) => match beacon_state { + Some(state) => state, + None => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not avaiable".into(), + request_id, + ); + return; + } + }, + Err(_) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not avaiable".into(), + request_id, + ); + return; + } + }; + let bootstrap = match LightClientBootstrap::from_beacon_state(&mut beacon_state) { + Ok(bootstrap) => bootstrap, + Err(_) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not avaiable".into(), + request_id, + ); + return; + } + }; + self.send_response( + peer_id, + Response::LightClientBootstrap(bootstrap), + request_id, + ) + } + /// Handle a `BlocksByRange` request from the peer. pub fn handle_blocks_by_range_request( self, diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 03b877506f..5df308f259 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -168,6 +168,9 @@ impl Router { Request::BlocksByRoot(request) => self .processor .on_blocks_by_root_request(peer_id, id, request), + Request::LightClientBootstrap(request) => self + .processor + .on_lightclient_bootstrap(peer_id, id, request), } } @@ -192,6 +195,7 @@ impl Router { self.processor .on_blocks_by_root_response(peer_id, request_id, beacon_block); } + Response::LightClientBootstrap(_) => unreachable!(), } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index ce11cbdcef..3c9a4a81fb 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -160,6 +160,18 @@ impl Processor { )) } + /// Handle a `LightClientBootstrap` request from the peer. + pub fn on_lightclient_bootstrap( + &mut self, + peer_id: PeerId, + request_id: PeerRequestId, + request: LightClientBootstrapRequest, + ) { + self.send_beacon_processor_work(BeaconWorkEvent::lightclient_bootstrap_request( + peer_id, request_id, request, + )) + } + /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request( &mut self, diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 406074fe38..472708ecb8 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -708,9 +708,6 @@ pub fn get_config( client_config.chain.builder_fallback_disable_checks = cli_args.is_present("builder-fallback-disable-checks"); - // Light client server config. - client_config.chain.enable_light_client_server = cli_args.is_present("light-client-server"); - Ok(client_config) } @@ -922,6 +919,9 @@ pub fn set_network_config( config.discv5_config.table_filter = |_| true; } + // Light client server config. + config.enable_light_client_server = cli_args.is_present("light-client-server"); + Ok(()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 0d70049250..b1ad50092c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1587,7 +1587,7 @@ fn sync_eth1_chain_disable_deposit_contract_sync_flag() { fn light_client_server_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert_eq!(config.chain.enable_light_client_server, false)); + .with_config(|config| assert_eq!(config.network.enable_light_client_server, false)); } #[test] @@ -1595,5 +1595,5 @@ fn light_client_server_enabled() { CommandLineTest::new() .flag("light-client-server", None) .run_with_zero_port() - .with_config(|config| assert_eq!(config.chain.enable_light_client_server, true)); + .with_config(|config| assert_eq!(config.network.enable_light_client_server, true)); }