Remove ttfb_timeout and resp_timeout (#7925)

`TTFB_TIMEOUT` was deprecated in https://github.com/ethereum/consensus-specs/pull/3767.


  Remove `ttfb_timeout` from `InboundUpgrade` and other related structs.

(Update)
Also removed `resp_timeout` and also removed the `NetworkParams` struct since its fields are no longer used. https://github.com/sigp/lighthouse/pull/7925#issuecomment-3226886352
This commit is contained in:
Akihito Nakano
2025-09-03 11:00:15 +09:00
committed by GitHub
parent a9db8523a2
commit 7b5be8b1e7
6 changed files with 13 additions and 64 deletions

11
Cargo.lock generated
View File

@@ -5642,7 +5642,6 @@ dependencies = [
"tempfile", "tempfile",
"tiny-keccak", "tiny-keccak",
"tokio", "tokio",
"tokio-io-timeout",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@@ -9432,16 +9431,6 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.5.0" version = "2.5.0"

View File

@@ -45,7 +45,6 @@ superstruct = { workspace = true }
task_executor = { workspace = true } task_executor = { workspace = true }
tiny-keccak = "2" tiny-keccak = "2"
tokio = { workspace = true } tokio = { workspace = true }
tokio-io-timeout = "1"
tokio-util = { workspace = true } tokio-util = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }

View File

@@ -39,6 +39,9 @@ const SHUTDOWN_TIMEOUT_SECS: u64 = 15;
/// Maximum number of simultaneous inbound substreams we keep for this peer. /// Maximum number of simultaneous inbound substreams we keep for this peer.
const MAX_INBOUND_SUBSTREAMS: usize = 32; const MAX_INBOUND_SUBSTREAMS: usize = 32;
/// Timeout that will be used for inbound and outbound responses.
const RESP_TIMEOUT: Duration = Duration::from_secs(10);
/// Identifier of inbound and outbound substreams from the handler's perspective. /// Identifier of inbound and outbound substreams from the handler's perspective.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize); pub struct SubstreamId(usize);
@@ -140,9 +143,6 @@ where
/// Waker, to be sure the handler gets polled when needed. /// Waker, to be sure the handler gets polled when needed.
waker: Option<std::task::Waker>, waker: Option<std::task::Waker>,
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
} }
enum HandlerState { enum HandlerState {
@@ -224,7 +224,6 @@ where
pub fn new( pub fn new(
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>, listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
resp_timeout: Duration,
peer_id: PeerId, peer_id: PeerId,
connection_id: ConnectionId, connection_id: ConnectionId,
) -> Self { ) -> Self {
@@ -246,7 +245,6 @@ where
outbound_io_error_retries: 0, outbound_io_error_retries: 0,
fork_context, fork_context,
waker: None, waker: None,
resp_timeout,
} }
} }
@@ -542,8 +540,7 @@ where
// If this substream has not ended, we reset the timer. // If this substream has not ended, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent. // Each chunk is allowed RESPONSE_TIMEOUT to be sent.
if let Some(ref delay_key) = info.delay_key { if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay self.inbound_substreams_delay.reset(delay_key, RESP_TIMEOUT);
.reset(delay_key, self.resp_timeout);
} }
// The stream may be currently idle. Attempt to process more // The stream may be currently idle. Attempt to process more
@@ -712,7 +709,7 @@ where
}; };
substream_entry.max_remaining_chunks = Some(max_remaining_chunks); substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout); .reset(delay_key, RESP_TIMEOUT);
} }
} }
@@ -960,7 +957,7 @@ where
// Store the stream and tag the output. // Store the stream and tag the output.
let delay_key = self let delay_key = self
.inbound_substreams_delay .inbound_substreams_delay
.insert(self.current_inbound_substream_id, self.resp_timeout); .insert(self.current_inbound_substream_id, RESP_TIMEOUT);
let awaiting_stream = InboundState::Idle(substream); let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert( self.inbound_substreams.insert(
self.current_inbound_substream_id, self.current_inbound_substream_id,
@@ -1036,7 +1033,7 @@ where
// new outbound request. Store the stream and tag the output. // new outbound request. Store the stream and tag the output.
let delay_key = self let delay_key = self
.outbound_substreams_delay .outbound_substreams_delay
.insert(self.current_outbound_substream_id, self.resp_timeout); .insert(self.current_outbound_substream_id, RESP_TIMEOUT);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream), substream: Box::new(substream),
request, request,

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, trace}; use tracing::{debug, trace};
use types::{EthSpec, ForkContext}; use types::{EthSpec, ForkContext};
@@ -143,12 +142,6 @@ pub struct RPCMessage<Id, E: EthSpec> {
type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>; type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>;
pub struct NetworkParams {
pub max_payload_size: usize,
pub ttfb_timeout: Duration,
pub resp_timeout: Duration,
}
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic. /// logic.
pub struct RPC<Id: ReqId, E: EthSpec> { pub struct RPC<Id: ReqId, E: EthSpec> {
@@ -162,8 +155,6 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
events: Vec<BehaviourAction<Id, E>>, events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
enable_light_client_server: bool, enable_light_client_server: bool,
/// Networking constant values
network_params: NetworkParams,
/// A sequential counter indicating when data gets modified. /// A sequential counter indicating when data gets modified.
seq_number: u64, seq_number: u64,
} }
@@ -174,7 +165,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
enable_light_client_server: bool, enable_light_client_server: bool,
inbound_rate_limiter_config: Option<InboundRateLimiterConfig>, inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>, outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
network_params: NetworkParams,
seq_number: u64, seq_number: u64,
) -> Self { ) -> Self {
let response_limiter = inbound_rate_limiter_config.map(|config| { let response_limiter = inbound_rate_limiter_config.map(|config| {
@@ -194,7 +184,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
events: Vec::new(), events: Vec::new(),
fork_context, fork_context,
enable_light_client_server, enable_light_client_server,
network_params,
seq_number, seq_number,
} }
} }
@@ -331,18 +320,11 @@ where
max_rpc_size: self.fork_context.spec.max_payload_size as usize, max_rpc_size: self.fork_context.spec.max_payload_size as usize,
enable_light_client_server: self.enable_light_client_server, enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData, phantom: PhantomData,
ttfb_timeout: self.network_params.ttfb_timeout,
}, },
(), (),
); );
let handler = RPCHandler::new( let handler = RPCHandler::new(protocol, self.fork_context.clone(), peer_id, connection_id);
protocol,
self.fork_context.clone(),
self.network_params.resp_timeout,
peer_id,
connection_id,
);
Ok(handler) Ok(handler)
} }
@@ -361,18 +343,11 @@ where
max_rpc_size: self.fork_context.spec.max_payload_size as usize, max_rpc_size: self.fork_context.spec.max_payload_size as usize,
enable_light_client_server: self.enable_light_client_server, enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData, phantom: PhantomData,
ttfb_timeout: self.network_params.ttfb_timeout,
}, },
(), (),
); );
let handler = RPCHandler::new( let handler = RPCHandler::new(protocol, self.fork_context.clone(), peer_id, connection_id);
protocol,
self.fork_context.clone(),
self.network_params.resp_timeout,
peer_id,
connection_id,
);
Ok(handler) Ok(handler)
} }

View File

@@ -11,7 +11,6 @@ use std::marker::PhantomData;
use std::sync::{Arc, LazyLock}; use std::sync::{Arc, LazyLock};
use std::time::Duration; use std::time::Duration;
use strum::{AsRefStr, Display, EnumString, IntoStaticStr}; use strum::{AsRefStr, Display, EnumString, IntoStaticStr};
use tokio_io_timeout::TimeoutStream;
use tokio_util::{ use tokio_util::{
codec::Framed, codec::Framed,
compat::{Compat, FuturesAsyncReadCompatExt}, compat::{Compat, FuturesAsyncReadCompatExt},
@@ -425,7 +424,6 @@ pub struct RPCProtocol<E: EthSpec> {
pub max_rpc_size: usize, pub max_rpc_size: usize,
pub enable_light_client_server: bool, pub enable_light_client_server: bool,
pub phantom: PhantomData<E>, pub phantom: PhantomData<E>,
pub ttfb_timeout: Duration,
} }
impl<E: EthSpec> UpgradeInfo for RPCProtocol<E> { impl<E: EthSpec> UpgradeInfo for RPCProtocol<E> {
@@ -652,7 +650,7 @@ pub fn rpc_data_column_limits<E: EthSpec>(
pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>); pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
pub type InboundFramed<TSocket, E> = pub type InboundFramed<TSocket, E> =
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>; Framed<std::pin::Pin<Box<Compat<TSocket>>>, SSZSnappyInboundCodec<E>>;
impl<TSocket, E> InboundUpgrade<TSocket> for RPCProtocol<E> impl<TSocket, E> InboundUpgrade<TSocket> for RPCProtocol<E>
where where
@@ -676,10 +674,7 @@ where
), ),
}; };
let mut timed_socket = TimeoutStream::new(socket); let socket = Framed::new(Box::pin(socket), codec);
timed_socket.set_read_timeout(Some(self.ttfb_timeout));
let socket = Framed::new(Box::pin(timed_socket), codec);
// MetaData requests should be empty, return the stream // MetaData requests should be empty, return the stream
match versioned_protocol { match versioned_protocol {

View File

@@ -12,8 +12,8 @@ use crate::peer_manager::{
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
use crate::rpc::methods::MetadataRequest; use crate::rpc::methods::MetadataRequest;
use crate::rpc::{ use crate::rpc::{
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPC, RPCError, GoodbyeReason, HandlerErr, InboundRequestId, Protocol, RPC, RPCError, RPCMessage, RPCReceived,
RPCMessage, RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse,
}; };
use crate::types::{ use crate::types::{
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
@@ -367,17 +367,11 @@ impl<E: EthSpec> Network<E> {
(gossipsub, update_gossipsub_scores) (gossipsub, update_gossipsub_scores)
}; };
let network_params = NetworkParams {
max_payload_size: ctx.chain_spec.max_payload_size as usize,
ttfb_timeout: ctx.chain_spec.ttfb_timeout(),
resp_timeout: ctx.chain_spec.resp_timeout(),
};
let eth2_rpc = RPC::new( let eth2_rpc = RPC::new(
ctx.fork_context.clone(), ctx.fork_context.clone(),
config.enable_light_client_server, config.enable_light_client_server,
config.inbound_rate_limiter_config.clone(), config.inbound_rate_limiter_config.clone(),
config.outbound_rate_limiter_config.clone(), config.outbound_rate_limiter_config.clone(),
network_params,
seq_number, seq_number,
); );