From 56eb81a5e0a383a7a8789cfe12a6b684fc3aadf6 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 10 Feb 2026 13:52:52 -0800 Subject: [PATCH 1/4] Implement weak subjectivity safety checks (#7347) Closes #7273 https://github.com/ethereum/consensus-specs/pull/4179 Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Michael Sproul Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/builder.rs | 29 ++++- beacon_node/beacon_chain/src/chain_config.rs | 3 + .../tests/payload_invalidation.rs | 9 +- beacon_node/beacon_chain/tests/store_tests.rs | 1 + beacon_node/beacon_chain/tests/tests.rs | 49 ++++++++- beacon_node/src/cli.rs | 10 ++ beacon_node/src/config.rs | 2 + beacon_node/src/lib.rs | 7 +- book/src/help_bn.md | 6 + consensus/types/src/state/beacon_state.rs | 104 ++++++++++++++++++ consensus/types/src/state/mod.rs | 2 +- lighthouse/tests/beacon_node.rs | 15 +++ 12 files changed, 222 insertions(+), 15 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index e5b656adf8..cf6cb1598b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -41,7 +41,7 @@ use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; use types::data::CustodyIndex; use types::{ @@ -848,6 +848,33 @@ where )); } + // Check if the head snapshot is within the weak subjectivity period + let head_state = &head_snapshot.beacon_state; + let Ok(ws_period) = head_state.compute_weak_subjectivity_period(&self.spec) else { + return Err(format!( + "Unable to compute the weak subjectivity period at the head snapshot slot: {:?}", + head_state.slot() + )); + }; + if current_slot.epoch(E::slots_per_epoch()) + > head_state.slot().epoch(E::slots_per_epoch()) + ws_period + { + if self.chain_config.ignore_ws_check { + warn!( + head_slot=%head_state.slot(), + %current_slot, + "The current head state is outside the weak subjectivity period. You are currently running a node that is susceptible to long range attacks. \ + It is highly recommended to purge your db and checkpoint sync. For more information please \ + read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity" + ) + } + return Err( + "The current head state is outside the weak subjectivity period. A node in this state is susceptible to long range attacks. You should purge your db and \ + checkpoint sync. For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \ + If you understand the risks, it is possible to ignore this error with the --ignore-ws-check flag.".to_string() + ); + } + let validator_pubkey_cache = self .validator_pubkey_cache .map(|mut validator_pubkey_cache| { diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 711ffdc99c..ad923000e2 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -117,6 +117,8 @@ pub struct ChainConfig { /// On Holesky there is a block which is added to this set by default but which can be removed /// by using `--invalid-block-roots ""`. pub invalid_block_roots: HashSet, + /// When set to true, the beacon node can be started even if the head state is outside the weak subjectivity period. + pub ignore_ws_check: bool, /// Disable the getBlobs optimisation to fetch blobs from the EL mempool. pub disable_get_blobs: bool, /// The node's custody type, determining how many data columns to custody and sample. @@ -160,6 +162,7 @@ impl Default for ChainConfig { block_publishing_delay: None, data_column_publishing_delay: None, invalid_block_roots: HashSet::new(), + ignore_ws_check: false, disable_get_blobs: false, node_custody_type: NodeCustodyType::Fullnode, } diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 1204412d65..f1e52de27b 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -6,7 +6,7 @@ use beacon_chain::{ INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, NotifyExecutionLayer, OverrideForkchoiceUpdate, StateSkipConfig, WhenSlotSkipped, canonical_head::{CachedHead, CanonicalHead}, - test_utils::{BeaconChainHarness, EphemeralHarnessType}, + test_utils::{BeaconChainHarness, EphemeralHarnessType, test_spec}, }; use execution_layer::{ ExecutionLayer, ForkchoiceState, PayloadAttributes, @@ -42,14 +42,11 @@ struct InvalidPayloadRig { impl InvalidPayloadRig { fn new() -> Self { - let spec = E::default_spec(); + let spec = test_spec::(); Self::new_with_spec(spec) } - fn new_with_spec(mut spec: ChainSpec) -> Self { - spec.altair_fork_epoch = Some(Epoch::new(0)); - spec.bellatrix_fork_epoch = Some(Epoch::new(0)); - + fn new_with_spec(spec: ChainSpec) -> Self { let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.into()) .chain_config(ChainConfig { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ea5f735bde..5410f26a5d 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -117,6 +117,7 @@ fn get_harness_import_all_data_columns( ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. let chain_config = ChainConfig { + ignore_ws_check: true, reconstruct_historic_states: true, ..ChainConfig::default() }; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 1884429a6a..fb86a1a845 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -15,7 +15,8 @@ use state_processing::EpochProcessingError; use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError}; use std::sync::LazyLock; use types::{ - BeaconState, BeaconStateError, BlockImportSource, Checkpoint, EthSpec, Hash256, MinimalEthSpec, + BeaconState, BeaconStateError, BlockImportSource, ChainSpec, Checkpoint, + DEFAULT_PRE_ELECTRA_WS_PERIOD, EthSpec, ForkName, Hash256, MainnetEthSpec, MinimalEthSpec, RelativeEpoch, Slot, }; @@ -38,6 +39,27 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness BeaconChainHarness> { + let chain_config = ChainConfig { + reconstruct_historic_states: true, + ..Default::default() + }; + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.clone().into()) + .chain_config(chain_config) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness +} + fn get_harness_with_config( validator_count: usize, chain_config: ChainConfig, @@ -1083,3 +1105,28 @@ async fn pseudo_finalize_with_lagging_split_update() { let expect_true_migration = false; pseudo_finalize_test_generic(epochs_per_migration, expect_true_migration).await; } + +#[tokio::test] +async fn test_compute_weak_subjectivity_period() { + type E = MainnetEthSpec; + let expected_ws_period_pre_electra = DEFAULT_PRE_ELECTRA_WS_PERIOD; + let expected_ws_period_post_electra = 256; + + // test Base variant + let spec = ForkName::Altair.make_genesis_spec(E::default_spec()); + let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec); + let head_state = harness.get_current_state(); + + let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period_pre_electra); + + // test Electra variant + let spec = ForkName::Electra.make_genesis_spec(E::default_spec()); + let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec); + let head_state = harness.get_current_state(); + + let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period_post_electra); +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9553fe60ba..5c3e8058d9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1404,6 +1404,16 @@ pub fn cli_app() -> Command { .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("ignore-ws-check") + .long("ignore-ws-check") + .help("Using this flag allows a node to run in a state that may expose it to long-range attacks. \ + For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \ + If you understand the risks, you can use this flag to disable the Weak Subjectivity check at startup.") + .action(ArgAction::SetTrue) + .help_heading(FLAG_HEADER) + .display_order(0) + ) .arg( Arg::new("builder-fallback-skips") .long("builder-fallback-skips") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 752cf10550..e6091d9213 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -780,6 +780,8 @@ pub fn get_config( client_config.chain.paranoid_block_proposal = cli_args.get_flag("paranoid-block-proposal"); + client_config.chain.ignore_ws_check = cli_args.get_flag("ignore-ws-check"); + /* * Builder fallback configs. */ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 6db2150e5f..e33da17e26 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -22,14 +22,9 @@ use types::{ChainSpec, Epoch, EthSpec, ForkName}; pub type ProductionClient = Client, BeaconNodeBackend>>; -/// The beacon node `Client` that will be used in production. +/// The beacon node `Client` that is used in production. /// /// Generic over some `EthSpec`. -/// -/// ## Notes: -/// -/// Despite being titled `Production...`, this code is not ready for production. The name -/// demonstrates an intention, not a promise. pub struct ProductionBeaconNode(ProductionClient); impl ProductionBeaconNode { diff --git a/book/src/help_bn.md b/book/src/help_bn.md index d3aa27c8a7..beb74da376 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -509,6 +509,12 @@ Flags: --http-enable-tls Serves the RESTful HTTP API server over TLS. This feature is currently experimental. + --ignore-ws-check + Using this flag allows a node to run in a state that may expose it to + long-range attacks. For more information please read this blog post: + https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity + If you understand the risks, you can use this flag to disable the Weak + Subjectivity check at startup. --import-all-attestations Import and aggregate all attestations, regardless of validator subscriptions. This will only import attestations from diff --git a/consensus/types/src/state/beacon_state.rs b/consensus/types/src/state/beacon_state.rs index 1352ded79e..1745908c40 100644 --- a/consensus/types/src/state/beacon_state.rs +++ b/consensus/types/src/state/beacon_state.rs @@ -55,9 +55,21 @@ use crate::{ }; pub const CACHED_EPOCHS: usize = 3; + +// Pre-electra WS calculations are not supported. On mainnet, pre-electra epochs are outside the weak subjectivity +// period. The default pre-electra WS value is set to 256 to allow for `basic-sim``, `fallback-sim`` test case `revert_minority_fork_on_resume` +// to pass. 256 is a small enough number to trigger the WS safety check pre-electra on mainnet. +pub const DEFAULT_PRE_ELECTRA_WS_PERIOD: u64 = 256; + const MAX_RANDOM_BYTE: u64 = (1 << 8) - 1; const MAX_RANDOM_VALUE: u64 = (1 << 16) - 1; +// `SAFETY_DECAY` is defined as the maximum percentage tolerable loss in the one-third +// safety margin of FFG finality. Thus, any attack exploiting the Weak Subjectivity Period has +// a safety margin of at least `1/3 - SAFETY_DECAY/100`. +// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/phase0/weak-subjectivity.md?plain=1#L50-L71 +const SAFETY_DECAY: u64 = 10; + pub type Validators = List::ValidatorRegistryLimit>; pub type Balances = List::ValidatorRegistryLimit>; @@ -3007,6 +3019,26 @@ impl BeaconState { Ok(()) } + /// Returns the weak subjectivity period for `self` + pub fn compute_weak_subjectivity_period( + &self, + spec: &ChainSpec, + ) -> Result { + let total_active_balance = self.get_total_active_balance()?; + let fork_name = self.fork_name_unchecked(); + + if fork_name.electra_enabled() { + let balance_churn_limit = self.get_balance_churn_limit(spec)?; + compute_weak_subjectivity_period_electra( + total_active_balance, + balance_churn_limit, + spec, + ) + } else { + Ok(Epoch::new(DEFAULT_PRE_ELECTRA_WS_PERIOD)) + } + } + /// Get the payload timeliness committee for the given `slot`. /// /// Requires the committee cache to be initialized. @@ -3382,3 +3414,75 @@ impl<'de, E: EthSpec> ContextDeserialize<'de, ForkName> for BeaconState { )) } } + +/// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L30 +pub fn compute_weak_subjectivity_period_electra( + total_active_balance: u64, + balance_churn_limit: u64, + spec: &ChainSpec, +) -> Result { + let epochs_for_validator_set_churn = SAFETY_DECAY + .safe_mul(total_active_balance)? + .safe_div(balance_churn_limit.safe_mul(200)?)?; + let ws_period = spec + .min_validator_withdrawability_delay + .safe_add(epochs_for_validator_set_churn)?; + + Ok(ws_period) +} + +#[cfg(test)] +mod weak_subjectivity_tests { + use crate::state::beacon_state::compute_weak_subjectivity_period_electra; + use crate::{ChainSpec, Epoch, EthSpec, MainnetEthSpec}; + + const GWEI_PER_ETH: u64 = 1_000_000_000; + + #[test] + fn test_compute_weak_subjectivity_period_electra() { + let mut spec = MainnetEthSpec::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + spec.capella_fork_epoch = Some(Epoch::new(0)); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.electra_fork_epoch = Some(Epoch::new(0)); + + // A table of some expected values: + // https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L44-L54 + // (total_active_balance, expected_ws_period) + let expected_values: Vec<(u64, u64)> = vec![ + (1_048_576 * GWEI_PER_ETH, 665), + (2_097_152 * GWEI_PER_ETH, 1_075), + (4_194_304 * GWEI_PER_ETH, 1_894), + (8_388_608 * GWEI_PER_ETH, 3_532), + (16_777_216 * GWEI_PER_ETH, 3_532), + (33_554_432 * GWEI_PER_ETH, 3_532), + // This value cross referenced w/ + // beacon_chain/tests/tests.rs:test_compute_weak_subjectivity_period + (1536 * GWEI_PER_ETH, 256), + ]; + + for (total_active_balance, expected_ws_period) in expected_values { + let balance_churn_limit = get_balance_churn_limit(total_active_balance, &spec); + + let calculated_ws_period = compute_weak_subjectivity_period_electra( + total_active_balance, + balance_churn_limit, + &spec, + ) + .unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period); + } + } + + // caclulate the balance_churn_limit without dealing with states + // and without initializing the active balance cache + fn get_balance_churn_limit(total_active_balance: u64, spec: &ChainSpec) -> u64 { + let churn = std::cmp::max( + spec.min_per_epoch_churn_limit_electra, + total_active_balance / spec.churn_limit_quotient, + ); + churn - (churn % spec.effective_balance_increment) + } +} diff --git a/consensus/types/src/state/mod.rs b/consensus/types/src/state/mod.rs index 309796d359..ea064fb7ac 100644 --- a/consensus/types/src/state/mod.rs +++ b/consensus/types/src/state/mod.rs @@ -17,7 +17,7 @@ pub use balance::Balance; pub use beacon_state::{ BeaconState, BeaconStateAltair, BeaconStateBase, BeaconStateBellatrix, BeaconStateCapella, BeaconStateDeneb, BeaconStateElectra, BeaconStateError, BeaconStateFulu, BeaconStateGloas, - BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, + BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, DEFAULT_PRE_ELECTRA_WS_PERIOD, }; pub use committee_cache::{ CommitteeCache, compute_committee_index_in_epoch, compute_committee_range_in_epoch, diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a2fad31f65..322787736b 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -295,6 +295,21 @@ fn paranoid_block_proposal_on() { .with_config(|config| assert!(config.chain.paranoid_block_proposal)); } +#[test] +fn ignore_ws_check_enabled() { + CommandLineTest::new() + .flag("ignore-ws-check", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.ignore_ws_check)); +} + +#[test] +fn ignore_ws_check_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.ignore_ws_check)); +} + #[test] fn reset_payload_statuses_default() { CommandLineTest::new() From e1d3dcc8dc19f137756f75c42c592964a07adfae Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Wed, 11 Feb 2026 07:49:20 +0900 Subject: [PATCH 2/4] Penalize peers that send an invalid rpc request (#6986) Since https://github.com/sigp/lighthouse/pull/6847, invalid `BlocksByRange`/`BlobsByRange` requests, which do not comply with the spec, are [handled in the Handler](https://github.com/sigp/lighthouse/blob/3d16d1080f5b93193404967dcb5525fa68840ea0/beacon_node/lighthouse_network/src/rpc/handler.rs#L880-L911). Any peer that sends an invalid request is penalized and disconnected. However, other kinds of invalid rpc request, which result in decoding errors, are just dropped. No penalty is applied and the connection with the peer remains. I have added handling for the `ListenUpgradeError` event to notify the application of an `RPCError:InvalidData` error and disconnect to the peer that sent the invalid rpc request. I also added tests for handling invalid rpc requests. Co-Authored-By: ackintosh --- .../lighthouse_network/src/rpc/handler.rs | 17 +- .../lighthouse_network/src/rpc/protocol.rs | 10 +- .../lighthouse_network/tests/rpc_tests.rs | 160 +++++++++++++++++- 3 files changed, 179 insertions(+), 8 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 720895bbe7..9861119ac1 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -13,7 +13,8 @@ use futures::prelude::*; use libp2p::PeerId; use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError, + SubstreamProtocol, }; use libp2p::swarm::{ConnectionId, Stream}; use logging::crit; @@ -888,6 +889,16 @@ where ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { self.on_dial_upgrade_error(info, error) } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: (proto, error), + .. + }) => { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto, + error, + })); + } _ => { // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on // release notes more than compiler feedback @@ -924,7 +935,7 @@ where request.count() )), })); - return self.shutdown(None); + return; } } RequestType::BlobsByRange(request) => { @@ -940,7 +951,7 @@ where max_allowed, max_requested_blobs )), })); - return self.shutdown(None); + return; } } _ => {} diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index f0ac9d00f9..34d8efccd1 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -675,7 +675,7 @@ where E: EthSpec, { type Output = InboundOutput; - type Error = RPCError; + type Error = (Protocol, RPCError); type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { @@ -717,10 +717,12 @@ where ) .await { - Err(e) => Err(RPCError::from(e)), + Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(e)), _)) => Err(e), - Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)), + Ok((None, _)) => { + Err((versioned_protocol.protocol(), RPCError::IncompleteStream)) + } } } } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 53939687d3..debe30b34f 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -5,8 +5,12 @@ use crate::common::spec_with_all_forks_enabled; use crate::common::{Protocol, build_tracing_subscriber}; use bls::Signature; use fixed_bytes::FixedBytesExtended; +use libp2p::PeerId; use lighthouse_network::rpc::{RequestType, methods::*}; -use lighthouse_network::service::api_types::AppRequestId; +use lighthouse_network::service::api_types::{ + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId, +}; use lighthouse_network::{NetworkEvent, ReportSource, Response}; use ssz::Encode; use ssz_types::{RuntimeVariableList, VariableList}; @@ -1783,3 +1787,157 @@ fn test_active_requests() { } }) } + +// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count, +// it bans the sender. +#[test] +fn test_request_too_large_blocks_by_range() { + let spec = Arc::new(spec_with_all_forks_enabled()); + + test_request_too_large( + AppRequestId::Sync(SyncRequestId::BlocksByRange(BlocksByRangeRequestId { + id: 1, + parent_request_id: ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + })), + RequestType::BlocksByRange(OldBlocksByRangeRequest::new( + 0, + spec.max_request_blocks(ForkName::Base) as u64 + 1, // exceeds the max request defined in the spec. + 1, + )), + ); +} + +// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count, +// it bans the sender. +#[test] +fn test_request_too_large_blobs_by_range() { + let spec = Arc::new(spec_with_all_forks_enabled()); + + let max_request_blobs_count = spec.max_request_blob_sidecars(ForkName::Base) as u64 + / spec.max_blobs_per_block_within_fork(ForkName::Base); + test_request_too_large( + AppRequestId::Sync(SyncRequestId::BlobsByRange(BlobsByRangeRequestId { + id: 1, + parent_request_id: ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + })), + RequestType::BlobsByRange(BlobsByRangeRequest { + start_slot: 0, + count: max_request_blobs_count + 1, // exceeds the max request defined in the spec. + }), + ); +} + +// Test that when a node receives an invalid DataColumnsByRange request exceeding the columns count, +// it bans the sender. +#[test] +fn test_request_too_large_data_columns_by_range() { + test_request_too_large( + AppRequestId::Sync(SyncRequestId::DataColumnsByRange( + DataColumnsByRangeRequestId { + id: 1, + parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( + ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + ), + peer: PeerId::random(), + }, + )), + RequestType::DataColumnsByRange(DataColumnsByRangeRequest { + start_slot: 0, + count: 0, + // exceeds the max request defined in the spec. + columns: vec![0; E::number_of_columns() + 1], + }), + ); +} + +fn test_request_too_large(app_request_id: AppRequestId, request: RequestType) { + // Set up the logging. + let log_level = "debug"; + let enable_logging = true; + let _subscriber = build_tracing_subscriber(log_level, enable_logging); + let rt = Arc::new(Runtime::new().unwrap()); + let spec = Arc::new(spec_with_all_forks_enabled()); + + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + Protocol::Tcp, + false, + None, + ) + .await; + + // Build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(?request, %peer_id, "Sending RPC request"); + sender + .send_request(peer_id, app_request_id, request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + app_request_id, + response, + .. + } => { + debug!(?app_request_id, ?response, "Received response"); + } + NetworkEvent::RPCFailed { error, .. } => { + // This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit. + debug!(?error, "RPC failed"); + unreachable!(); + } + NetworkEvent::PeerDisconnected(peer_id) => { + // The receiver should disconnect as a result of the invalid request. + debug!(%peer_id, "Peer disconnected"); + // End the test. + return; + } + _ => {} + } + } + } + .instrument(info_span!("Sender")); + + // Build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await { + // This event should be unreachable, as the handler drops the invalid request. + unreachable!(); + } + } + } + .instrument(info_span!("Receiver")); + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }); +} From 889946c04b15798cb8314e284cbf866e76523632 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Wed, 11 Feb 2026 08:14:28 +0900 Subject: [PATCH 3/4] Remove pending requests from ready_requests (#6625) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: ackintosh Co-Authored-By: João Oliveira --- .../src/rpc/rate_limiter.rs | 8 ++ .../src/rpc/self_limiter.rs | 100 ++++++++++++++++-- 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 8b364f506c..2407038bc3 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -77,6 +77,14 @@ impl Quota { max_tokens: n, } } + + #[cfg(test)] + pub const fn n_every_millis(n: NonZeroU64, millis: u64) -> Self { + Quota { + replenish_all_every: Duration::from_millis(millis), + max_tokens: n, + } + } } /// Manages rate limiting of requests per peer, with differentiated rates per protocol. diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 90e2db9135..2a7ef955a1 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -4,6 +4,10 @@ use super::{ rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, }; use crate::rpc::rate_limiter::RateLimiterItem; +use futures::FutureExt; +use libp2p::{PeerId, swarm::NotifyHandler}; +use logging::crit; +use smallvec::SmallVec; use std::time::{SystemTime, UNIX_EPOCH}; use std::{ collections::{HashMap, VecDeque, hash_map::Entry}, @@ -11,11 +15,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use futures::FutureExt; -use libp2p::{PeerId, swarm::NotifyHandler}; -use logging::crit; -use smallvec::SmallVec; use tokio_util::time::DelayQueue; use tracing::debug; use types::{EthSpec, ForkContext}; @@ -234,9 +233,29 @@ impl SelfRateLimiter { pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { self.active_requests.remove(&peer_id); + let mut failed_requests = Vec::new(); + + self.ready_requests.retain(|(req_peer_id, rpc_send, _)| { + if let RPCSend::Request(request_id, req) = rpc_send { + if req_peer_id == &peer_id { + failed_requests.push((*request_id, req.protocol())); + // Remove the entry + false + } else { + // Keep the entry + true + } + } else { + debug_assert!( + false, + "Coding error: unexpected RPCSend variant {rpc_send:?}." + ); + false + } + }); + // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // should never really be large. So we iterate for simplicity - let mut failed_requests = Vec::new(); self.delayed_requests .retain(|(map_peer_id, protocol), queue| { if map_peer_id == &peer_id { @@ -252,6 +271,7 @@ impl SelfRateLimiter { true } }); + failed_requests } @@ -549,4 +569,72 @@ mod tests { .contains_key(&(peer2, Protocol::Ping)) ); } + + /// Test that `peer_disconnected` returns the IDs of pending requests. + #[tokio::test] + async fn test_peer_disconnected_returns_failed_requests() { + const REPLENISH_DURATION: u64 = 50; + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let config = OutboundRateLimiterConfig(RateLimiterConfig { + ping_quota: Quota::n_every_millis(NonZeroU64::new(1).unwrap(), REPLENISH_DURATION), + ..Default::default() + }); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(Some(config), fork_context).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5u32 { + let result = limiter.allows( + peer_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + req_id: i, + lookup_id: i, + }, + }), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first request while other requests are added to the queue. + if i == 1 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + + // Wait until the tokens have been regenerated, then run `next_peer_request_ready`. + tokio::time::sleep(Duration::from_millis(REPLENISH_DURATION + 10)).await; + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + // Check that one of the pending requests has moved to ready_requests. + assert_eq!( + limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap() + .len(), + 3 + ); + assert_eq!(limiter.ready_requests.len(), 1); + + let mut failed_requests = limiter.peer_disconnected(peer_id); + + // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly. + assert_eq!(failed_requests.len(), 4); + for i in 2..=5u32 { + let (request_id, protocol) = failed_requests.remove(0); + assert!(matches!( + request_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + }) if req_id == i + )); + assert_eq!(protocol, Protocol::Ping); + } + } } From 8d72cc34ebe3c94c0856a0c9534943179e11b39f Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 10 Feb 2026 16:40:01 -0700 Subject: [PATCH 4/4] Add sync request metrics (#7790) Add error rates metrics on unstable to benchmark against tree-sync. In my branch there are frequent errors but mostly connections errors as the node is still finding it set of stable peers. These metrics are very useful and unstable can benefit from them ahead of tree-sync Add three new metrics: - sync_rpc_requests_success_total: Total count of sync RPC requests successes - sync_rpc_requests_error_total: Total count of sync RPC requests errors - sync_rpc_request_duration_sec: Time to complete a successful sync RPC requesst Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/network/src/metrics.rs | 24 ++++++++ .../network/src/sync/network_context.rs | 37 +++---------- .../src/sync/network_context/requests.rs | 55 ++++++++++++++++--- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0016f66c01 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock> = Lazy &["type"], ) }); +pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_success_total", + "Total count of sync RPC requests successes", + &["protocol"], + ) +}); +pub static SYNC_RPC_REQUEST_ERRORS: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_error_total", + "Total count of sync RPC requests errors", + &["protocol", "error"], + ) +}); +pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "sync_rpc_request_duration_sec", + "Time to complete a successful sync RPC requesst", + Ok(vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0, + ]), + &["protocol"], + ) +}); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7f4da9c0da..542625b8a3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1430,7 +1430,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } pub(crate) fn on_single_blob_response( @@ -1459,7 +1459,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1472,7 +1472,7 @@ impl SyncNetworkContext { let resp = self .data_columns_by_root_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1483,7 +1483,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1494,7 +1494,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blobs_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1507,36 +1507,15 @@ impl SyncNetworkContext { let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) + self.on_rpc_response_result(resp, peer_id) } - fn on_rpc_response_result usize>( + /// Common handler for consistent scoring of RpcResponseError + fn on_rpc_response_result( &mut self, - id: I, - method: &'static str, resp: Option>, peer_id: PeerId, - get_count: F, ) -> Option> { - match &resp { - None => {} - Some(Ok((v, _))) => { - debug!( - %id, - method, - count = get_count(v), - "Sync RPC request completed" - ); - } - Some(Err(e)) => { - debug!( - %id, - method, - error = ?e, - "Sync RPC request error" - ); - } - } if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 3183c06d76..8f9540693e 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,11 @@ +use std::time::Instant; use std::{collections::hash_map::Entry, hash::Hash}; use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use tracing::Span; +use tracing::{Span, debug}; use types::{Hash256, Slot}; pub use blobs_by_range::BlobsByRangeRequestItems; @@ -18,7 +19,7 @@ pub use data_columns_by_root::{ use crate::metrics; -use super::{RpcEvent, RpcResponseResult}; +use super::{RpcEvent, RpcResponseError, RpcResponseResult}; mod blobs_by_range; mod blobs_by_root; @@ -51,6 +52,7 @@ struct ActiveRequest { peer_id: PeerId, // Error if the request terminates before receiving max expected responses expect_max_responses: bool, + start_instant: Instant, span: Span, } @@ -60,7 +62,7 @@ enum State { Errored, } -impl ActiveRequests { +impl ActiveRequests { pub fn new(name: &'static str) -> Self { Self { requests: <_>::default(), @@ -83,6 +85,7 @@ impl ActiveRequests { state: State::Active(items), peer_id, expect_max_responses, + start_instant: Instant::now(), span, }, ); @@ -112,7 +115,7 @@ impl ActiveRequests { return None; }; - match rpc_event { + let result = match rpc_event { // Handler of a success ReqResp chunk. Adds the item to the request accumulator. // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { @@ -126,7 +129,7 @@ impl ActiveRequests { Ok(true) => { let items = items.consume(); request.state = State::CompletedEarly; - Some(Ok((items, seen_timestamp))) + Some(Ok((items, seen_timestamp, request.start_instant.elapsed()))) } // Received item, but we are still expecting more Ok(false) => None, @@ -163,7 +166,11 @@ impl ActiveRequests { } .into())) } else { - Some(Ok((items.consume(), timestamp_now()))) + Some(Ok(( + items.consume(), + timestamp_now(), + request.start_instant.elapsed(), + ))) } } // Items already returned, ignore stream termination @@ -188,7 +195,41 @@ impl ActiveRequests { State::Errored => None, } } - } + }; + + result.map(|result| match result { + Ok((items, seen_timestamp, duration)) => { + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]); + metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration); + debug!( + %id, + method = self.name, + count = items.len(), + "Sync RPC request completed" + ); + + Ok((items, seen_timestamp)) + } + Err(e) => { + let err_str: &'static str = match &e { + RpcResponseError::RpcError(e) => e.into(), + RpcResponseError::VerifyError(e) => e.into(), + RpcResponseError::CustodyRequestError(_) => "CustodyRequestError", + RpcResponseError::BlockComponentCouplingError(_) => { + "BlockComponentCouplingError" + } + }; + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]); + debug!( + %id, + method = self.name, + error = ?e, + "Sync RPC request error" + ); + + Err(e) + } + }) } pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {