From 7b5be8b1e74cf0ce71aeda71e017dd426920e955 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Wed, 3 Sep 2025 11:00:15 +0900 Subject: [PATCH] Remove ttfb_timeout and resp_timeout (#7925) `TTFB_TIMEOUT` was deprecated in https://github.com/ethereum/consensus-specs/pull/3767. Remove `ttfb_timeout` from `InboundUpgrade` and other related structs. (Update) Also removed `resp_timeout` and also removed the `NetworkParams` struct since its fields are no longer used. https://github.com/sigp/lighthouse/pull/7925#issuecomment-3226886352 --- Cargo.lock | 11 ------- beacon_node/lighthouse_network/Cargo.toml | 1 - .../lighthouse_network/src/rpc/handler.rs | 17 +++++------ beacon_node/lighthouse_network/src/rpc/mod.rs | 29 ++----------------- .../lighthouse_network/src/rpc/protocol.rs | 9 ++---- .../lighthouse_network/src/service/mod.rs | 10 ++----- 6 files changed, 13 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 114b02827e..4020d9611f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5642,7 +5642,6 @@ dependencies = [ "tempfile", "tiny-keccak", "tokio", - "tokio-io-timeout", "tokio-util", "tracing", "tracing-subscriber", @@ -9432,16 +9431,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-macros" version = "2.5.0" diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 0b2ca9e818..9963cc0bc4 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -45,7 +45,6 @@ superstruct = { workspace = true } task_executor = { workspace = true } tiny-keccak = "2" tokio = { workspace = true } -tokio-io-timeout = "1" tokio-util = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 972d45cdfe..720895bbe7 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -39,6 +39,9 @@ const SHUTDOWN_TIMEOUT_SECS: u64 = 15; /// Maximum number of simultaneous inbound substreams we keep for this peer. const MAX_INBOUND_SUBSTREAMS: usize = 32; +/// Timeout that will be used for inbound and outbound responses. +const RESP_TIMEOUT: Duration = Duration::from_secs(10); + /// Identifier of inbound and outbound substreams from the handler's perspective. #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] pub struct SubstreamId(usize); @@ -140,9 +143,6 @@ where /// Waker, to be sure the handler gets polled when needed. waker: Option, - - /// Timeout that will be used for inbound and outbound responses. - resp_timeout: Duration, } enum HandlerState { @@ -224,7 +224,6 @@ where pub fn new( listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, - resp_timeout: Duration, peer_id: PeerId, connection_id: ConnectionId, ) -> Self { @@ -246,7 +245,6 @@ where outbound_io_error_retries: 0, fork_context, waker: None, - resp_timeout, } } @@ -542,8 +540,7 @@ where // If this substream has not ended, we reset the timer. // Each chunk is allowed RESPONSE_TIMEOUT to be sent. if let Some(ref delay_key) = info.delay_key { - self.inbound_substreams_delay - .reset(delay_key, self.resp_timeout); + self.inbound_substreams_delay.reset(delay_key, RESP_TIMEOUT); } // The stream may be currently idle. Attempt to process more @@ -712,7 +709,7 @@ where }; substream_entry.max_remaining_chunks = Some(max_remaining_chunks); self.outbound_substreams_delay - .reset(delay_key, self.resp_timeout); + .reset(delay_key, RESP_TIMEOUT); } } @@ -960,7 +957,7 @@ where // Store the stream and tag the output. let delay_key = self .inbound_substreams_delay - .insert(self.current_inbound_substream_id, self.resp_timeout); + .insert(self.current_inbound_substream_id, RESP_TIMEOUT); let awaiting_stream = InboundState::Idle(substream); self.inbound_substreams.insert( self.current_inbound_substream_id, @@ -1036,7 +1033,7 @@ where // new outbound request. Store the stream and tag the output. let delay_key = self .outbound_substreams_delay - .insert(self.current_outbound_substream_id, self.resp_timeout); + .insert(self.current_outbound_substream_id, RESP_TIMEOUT); let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: Box::new(substream), request, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 5e8e55891c..7c43018af8 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; use tracing::{debug, trace}; use types::{EthSpec, ForkContext}; @@ -143,12 +142,6 @@ pub struct RPCMessage { type BehaviourAction = ToSwarm, RPCSend>; -pub struct NetworkParams { - pub max_payload_size: usize, - pub ttfb_timeout: Duration, - pub resp_timeout: Duration, -} - /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { @@ -162,8 +155,6 @@ pub struct RPC { events: Vec>, fork_context: Arc, enable_light_client_server: bool, - /// Networking constant values - network_params: NetworkParams, /// A sequential counter indicating when data gets modified. seq_number: u64, } @@ -174,7 +165,6 @@ impl RPC { enable_light_client_server: bool, inbound_rate_limiter_config: Option, outbound_rate_limiter_config: Option, - network_params: NetworkParams, seq_number: u64, ) -> Self { let response_limiter = inbound_rate_limiter_config.map(|config| { @@ -194,7 +184,6 @@ impl RPC { events: Vec::new(), fork_context, enable_light_client_server, - network_params, seq_number, } } @@ -331,18 +320,11 @@ where max_rpc_size: self.fork_context.spec.max_payload_size as usize, enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, - ttfb_timeout: self.network_params.ttfb_timeout, }, (), ); - let handler = RPCHandler::new( - protocol, - self.fork_context.clone(), - self.network_params.resp_timeout, - peer_id, - connection_id, - ); + let handler = RPCHandler::new(protocol, self.fork_context.clone(), peer_id, connection_id); Ok(handler) } @@ -361,18 +343,11 @@ where max_rpc_size: self.fork_context.spec.max_payload_size as usize, enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, - ttfb_timeout: self.network_params.ttfb_timeout, }, (), ); - let handler = RPCHandler::new( - protocol, - self.fork_context.clone(), - self.network_params.resp_timeout, - peer_id, - connection_id, - ); + let handler = RPCHandler::new(protocol, self.fork_context.clone(), peer_id, connection_id); Ok(handler) } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 6529ff5f92..228a74f08c 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -11,7 +11,6 @@ use std::marker::PhantomData; use std::sync::{Arc, LazyLock}; use std::time::Duration; use strum::{AsRefStr, Display, EnumString, IntoStaticStr}; -use tokio_io_timeout::TimeoutStream; use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, @@ -425,7 +424,6 @@ pub struct RPCProtocol { pub max_rpc_size: usize, pub enable_light_client_server: bool, pub phantom: PhantomData, - pub ttfb_timeout: Duration, } impl UpgradeInfo for RPCProtocol { @@ -652,7 +650,7 @@ pub fn rpc_data_column_limits( pub type InboundOutput = (RequestType, InboundFramed); pub type InboundFramed = - Framed>>>, SSZSnappyInboundCodec>; + Framed>>, SSZSnappyInboundCodec>; impl InboundUpgrade for RPCProtocol where @@ -676,10 +674,7 @@ where ), }; - let mut timed_socket = TimeoutStream::new(socket); - timed_socket.set_read_timeout(Some(self.ttfb_timeout)); - - let socket = Framed::new(Box::pin(timed_socket), codec); + let socket = Framed::new(Box::pin(socket), codec); // MetaData requests should be empty, return the stream match versioned_protocol { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index eebc2f0200..9edb70555d 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -12,8 +12,8 @@ use crate::peer_manager::{ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; use crate::rpc::methods::MetadataRequest; use crate::rpc::{ - GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPC, RPCError, - RPCMessage, RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, + GoodbyeReason, HandlerErr, InboundRequestId, Protocol, RPC, RPCError, RPCMessage, RPCReceived, + RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, }; use crate::types::{ GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, @@ -367,17 +367,11 @@ impl Network { (gossipsub, update_gossipsub_scores) }; - let network_params = NetworkParams { - max_payload_size: ctx.chain_spec.max_payload_size as usize, - ttfb_timeout: ctx.chain_spec.ttfb_timeout(), - resp_timeout: ctx.chain_spec.resp_timeout(), - }; let eth2_rpc = RPC::new( ctx.fork_context.clone(), config.enable_light_client_server, config.inbound_rate_limiter_config.clone(), config.outbound_rate_limiter_config.clone(), - network_params, seq_number, );