diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 0079335d8d..37f7ca9811 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -41,7 +41,7 @@ use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; use types::data::CustodyIndex; use types::{ @@ -848,6 +848,33 @@ where )); } + // Check if the head snapshot is within the weak subjectivity period + let head_state = &head_snapshot.beacon_state; + let Ok(ws_period) = head_state.compute_weak_subjectivity_period(&self.spec) else { + return Err(format!( + "Unable to compute the weak subjectivity period at the head snapshot slot: {:?}", + head_state.slot() + )); + }; + if current_slot.epoch(E::slots_per_epoch()) + > head_state.slot().epoch(E::slots_per_epoch()) + ws_period + { + if self.chain_config.ignore_ws_check { + warn!( + head_slot=%head_state.slot(), + %current_slot, + "The current head state is outside the weak subjectivity period. You are currently running a node that is susceptible to long range attacks. \ + It is highly recommended to purge your db and checkpoint sync. For more information please \ + read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity" + ) + } + return Err( + "The current head state is outside the weak subjectivity period. A node in this state is susceptible to long range attacks. You should purge your db and \ + checkpoint sync. For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \ + If you understand the risks, it is possible to ignore this error with the --ignore-ws-check flag.".to_string() + ); + } + let validator_pubkey_cache = self .validator_pubkey_cache .map(|mut validator_pubkey_cache| { diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 711ffdc99c..ad923000e2 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -117,6 +117,8 @@ pub struct ChainConfig { /// On Holesky there is a block which is added to this set by default but which can be removed /// by using `--invalid-block-roots ""`. pub invalid_block_roots: HashSet, + /// When set to true, the beacon node can be started even if the head state is outside the weak subjectivity period. + pub ignore_ws_check: bool, /// Disable the getBlobs optimisation to fetch blobs from the EL mempool. pub disable_get_blobs: bool, /// The node's custody type, determining how many data columns to custody and sample. @@ -160,6 +162,7 @@ impl Default for ChainConfig { block_publishing_delay: None, data_column_publishing_delay: None, invalid_block_roots: HashSet::new(), + ignore_ws_check: false, disable_get_blobs: false, node_custody_type: NodeCustodyType::Fullnode, } diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 1204412d65..f1e52de27b 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -6,7 +6,7 @@ use beacon_chain::{ INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, NotifyExecutionLayer, OverrideForkchoiceUpdate, StateSkipConfig, WhenSlotSkipped, canonical_head::{CachedHead, CanonicalHead}, - test_utils::{BeaconChainHarness, EphemeralHarnessType}, + test_utils::{BeaconChainHarness, EphemeralHarnessType, test_spec}, }; use execution_layer::{ ExecutionLayer, ForkchoiceState, PayloadAttributes, @@ -42,14 +42,11 @@ struct InvalidPayloadRig { impl InvalidPayloadRig { fn new() -> Self { - let spec = E::default_spec(); + let spec = test_spec::(); Self::new_with_spec(spec) } - fn new_with_spec(mut spec: ChainSpec) -> Self { - spec.altair_fork_epoch = Some(Epoch::new(0)); - spec.bellatrix_fork_epoch = Some(Epoch::new(0)); - + fn new_with_spec(spec: ChainSpec) -> Self { let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.into()) .chain_config(ChainConfig { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ea5f735bde..5410f26a5d 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -117,6 +117,7 @@ fn get_harness_import_all_data_columns( ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. let chain_config = ChainConfig { + ignore_ws_check: true, reconstruct_historic_states: true, ..ChainConfig::default() }; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 1884429a6a..fb86a1a845 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -15,7 +15,8 @@ use state_processing::EpochProcessingError; use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError}; use std::sync::LazyLock; use types::{ - BeaconState, BeaconStateError, BlockImportSource, Checkpoint, EthSpec, Hash256, MinimalEthSpec, + BeaconState, BeaconStateError, BlockImportSource, ChainSpec, Checkpoint, + DEFAULT_PRE_ELECTRA_WS_PERIOD, EthSpec, ForkName, Hash256, MainnetEthSpec, MinimalEthSpec, RelativeEpoch, Slot, }; @@ -38,6 +39,27 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness BeaconChainHarness> { + let chain_config = ChainConfig { + reconstruct_historic_states: true, + ..Default::default() + }; + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.clone().into()) + .chain_config(chain_config) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness +} + fn get_harness_with_config( validator_count: usize, chain_config: ChainConfig, @@ -1083,3 +1105,28 @@ async fn pseudo_finalize_with_lagging_split_update() { let expect_true_migration = false; pseudo_finalize_test_generic(epochs_per_migration, expect_true_migration).await; } + +#[tokio::test] +async fn test_compute_weak_subjectivity_period() { + type E = MainnetEthSpec; + let expected_ws_period_pre_electra = DEFAULT_PRE_ELECTRA_WS_PERIOD; + let expected_ws_period_post_electra = 256; + + // test Base variant + let spec = ForkName::Altair.make_genesis_spec(E::default_spec()); + let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec); + let head_state = harness.get_current_state(); + + let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period_pre_electra); + + // test Electra variant + let spec = ForkName::Electra.make_genesis_spec(E::default_spec()); + let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec); + let head_state = harness.get_current_state(); + + let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period_post_electra); +} diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 720895bbe7..9861119ac1 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -13,7 +13,8 @@ use futures::prelude::*; use libp2p::PeerId; use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError, + SubstreamProtocol, }; use libp2p::swarm::{ConnectionId, Stream}; use logging::crit; @@ -888,6 +889,16 @@ where ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { self.on_dial_upgrade_error(info, error) } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: (proto, error), + .. + }) => { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto, + error, + })); + } _ => { // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on // release notes more than compiler feedback @@ -924,7 +935,7 @@ where request.count() )), })); - return self.shutdown(None); + return; } } RequestType::BlobsByRange(request) => { @@ -940,7 +951,7 @@ where max_allowed, max_requested_blobs )), })); - return self.shutdown(None); + return; } } _ => {} diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index f0ac9d00f9..34d8efccd1 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -675,7 +675,7 @@ where E: EthSpec, { type Output = InboundOutput; - type Error = RPCError; + type Error = (Protocol, RPCError); type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { @@ -717,10 +717,12 @@ where ) .await { - Err(e) => Err(RPCError::from(e)), + Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(e)), _)) => Err(e), - Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)), + Ok((None, _)) => { + Err((versioned_protocol.protocol(), RPCError::IncompleteStream)) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 8b364f506c..2407038bc3 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -77,6 +77,14 @@ impl Quota { max_tokens: n, } } + + #[cfg(test)] + pub const fn n_every_millis(n: NonZeroU64, millis: u64) -> Self { + Quota { + replenish_all_every: Duration::from_millis(millis), + max_tokens: n, + } + } } /// Manages rate limiting of requests per peer, with differentiated rates per protocol. diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 90e2db9135..2a7ef955a1 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -4,6 +4,10 @@ use super::{ rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, }; use crate::rpc::rate_limiter::RateLimiterItem; +use futures::FutureExt; +use libp2p::{PeerId, swarm::NotifyHandler}; +use logging::crit; +use smallvec::SmallVec; use std::time::{SystemTime, UNIX_EPOCH}; use std::{ collections::{HashMap, VecDeque, hash_map::Entry}, @@ -11,11 +15,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use futures::FutureExt; -use libp2p::{PeerId, swarm::NotifyHandler}; -use logging::crit; -use smallvec::SmallVec; use tokio_util::time::DelayQueue; use tracing::debug; use types::{EthSpec, ForkContext}; @@ -234,9 +233,29 @@ impl SelfRateLimiter { pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { self.active_requests.remove(&peer_id); + let mut failed_requests = Vec::new(); + + self.ready_requests.retain(|(req_peer_id, rpc_send, _)| { + if let RPCSend::Request(request_id, req) = rpc_send { + if req_peer_id == &peer_id { + failed_requests.push((*request_id, req.protocol())); + // Remove the entry + false + } else { + // Keep the entry + true + } + } else { + debug_assert!( + false, + "Coding error: unexpected RPCSend variant {rpc_send:?}." + ); + false + } + }); + // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // should never really be large. So we iterate for simplicity - let mut failed_requests = Vec::new(); self.delayed_requests .retain(|(map_peer_id, protocol), queue| { if map_peer_id == &peer_id { @@ -252,6 +271,7 @@ impl SelfRateLimiter { true } }); + failed_requests } @@ -549,4 +569,72 @@ mod tests { .contains_key(&(peer2, Protocol::Ping)) ); } + + /// Test that `peer_disconnected` returns the IDs of pending requests. + #[tokio::test] + async fn test_peer_disconnected_returns_failed_requests() { + const REPLENISH_DURATION: u64 = 50; + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let config = OutboundRateLimiterConfig(RateLimiterConfig { + ping_quota: Quota::n_every_millis(NonZeroU64::new(1).unwrap(), REPLENISH_DURATION), + ..Default::default() + }); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(Some(config), fork_context).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5u32 { + let result = limiter.allows( + peer_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + req_id: i, + lookup_id: i, + }, + }), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first request while other requests are added to the queue. + if i == 1 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + + // Wait until the tokens have been regenerated, then run `next_peer_request_ready`. + tokio::time::sleep(Duration::from_millis(REPLENISH_DURATION + 10)).await; + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + // Check that one of the pending requests has moved to ready_requests. + assert_eq!( + limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap() + .len(), + 3 + ); + assert_eq!(limiter.ready_requests.len(), 1); + + let mut failed_requests = limiter.peer_disconnected(peer_id); + + // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly. + assert_eq!(failed_requests.len(), 4); + for i in 2..=5u32 { + let (request_id, protocol) = failed_requests.remove(0); + assert!(matches!( + request_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + }) if req_id == i + )); + assert_eq!(protocol, Protocol::Ping); + } + } } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 53939687d3..debe30b34f 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -5,8 +5,12 @@ use crate::common::spec_with_all_forks_enabled; use crate::common::{Protocol, build_tracing_subscriber}; use bls::Signature; use fixed_bytes::FixedBytesExtended; +use libp2p::PeerId; use lighthouse_network::rpc::{RequestType, methods::*}; -use lighthouse_network::service::api_types::AppRequestId; +use lighthouse_network::service::api_types::{ + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId, +}; use lighthouse_network::{NetworkEvent, ReportSource, Response}; use ssz::Encode; use ssz_types::{RuntimeVariableList, VariableList}; @@ -1783,3 +1787,157 @@ fn test_active_requests() { } }) } + +// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count, +// it bans the sender. +#[test] +fn test_request_too_large_blocks_by_range() { + let spec = Arc::new(spec_with_all_forks_enabled()); + + test_request_too_large( + AppRequestId::Sync(SyncRequestId::BlocksByRange(BlocksByRangeRequestId { + id: 1, + parent_request_id: ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + })), + RequestType::BlocksByRange(OldBlocksByRangeRequest::new( + 0, + spec.max_request_blocks(ForkName::Base) as u64 + 1, // exceeds the max request defined in the spec. + 1, + )), + ); +} + +// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count, +// it bans the sender. +#[test] +fn test_request_too_large_blobs_by_range() { + let spec = Arc::new(spec_with_all_forks_enabled()); + + let max_request_blobs_count = spec.max_request_blob_sidecars(ForkName::Base) as u64 + / spec.max_blobs_per_block_within_fork(ForkName::Base); + test_request_too_large( + AppRequestId::Sync(SyncRequestId::BlobsByRange(BlobsByRangeRequestId { + id: 1, + parent_request_id: ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + })), + RequestType::BlobsByRange(BlobsByRangeRequest { + start_slot: 0, + count: max_request_blobs_count + 1, // exceeds the max request defined in the spec. + }), + ); +} + +// Test that when a node receives an invalid DataColumnsByRange request exceeding the columns count, +// it bans the sender. +#[test] +fn test_request_too_large_data_columns_by_range() { + test_request_too_large( + AppRequestId::Sync(SyncRequestId::DataColumnsByRange( + DataColumnsByRangeRequestId { + id: 1, + parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( + ComponentsByRangeRequestId { + id: 1, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(1), + }, + }, + ), + peer: PeerId::random(), + }, + )), + RequestType::DataColumnsByRange(DataColumnsByRangeRequest { + start_slot: 0, + count: 0, + // exceeds the max request defined in the spec. + columns: vec![0; E::number_of_columns() + 1], + }), + ); +} + +fn test_request_too_large(app_request_id: AppRequestId, request: RequestType) { + // Set up the logging. + let log_level = "debug"; + let enable_logging = true; + let _subscriber = build_tracing_subscriber(log_level, enable_logging); + let rt = Arc::new(Runtime::new().unwrap()); + let spec = Arc::new(spec_with_all_forks_enabled()); + + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + Protocol::Tcp, + false, + None, + ) + .await; + + // Build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(?request, %peer_id, "Sending RPC request"); + sender + .send_request(peer_id, app_request_id, request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + app_request_id, + response, + .. + } => { + debug!(?app_request_id, ?response, "Received response"); + } + NetworkEvent::RPCFailed { error, .. } => { + // This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit. + debug!(?error, "RPC failed"); + unreachable!(); + } + NetworkEvent::PeerDisconnected(peer_id) => { + // The receiver should disconnect as a result of the invalid request. + debug!(%peer_id, "Peer disconnected"); + // End the test. + return; + } + _ => {} + } + } + } + .instrument(info_span!("Sender")); + + // Build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await { + // This event should be unreachable, as the handler drops the invalid request. + unreachable!(); + } + } + } + .instrument(info_span!("Receiver")); + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }); +} diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0016f66c01 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock> = Lazy &["type"], ) }); +pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_success_total", + "Total count of sync RPC requests successes", + &["protocol"], + ) +}); +pub static SYNC_RPC_REQUEST_ERRORS: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_error_total", + "Total count of sync RPC requests errors", + &["protocol", "error"], + ) +}); +pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "sync_rpc_request_duration_sec", + "Time to complete a successful sync RPC requesst", + Ok(vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0, + ]), + &["protocol"], + ) +}); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7f4da9c0da..542625b8a3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1430,7 +1430,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } pub(crate) fn on_single_blob_response( @@ -1459,7 +1459,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1472,7 +1472,7 @@ impl SyncNetworkContext { let resp = self .data_columns_by_root_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1483,7 +1483,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1494,7 +1494,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blobs_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1507,36 +1507,15 @@ impl SyncNetworkContext { let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) + self.on_rpc_response_result(resp, peer_id) } - fn on_rpc_response_result usize>( + /// Common handler for consistent scoring of RpcResponseError + fn on_rpc_response_result( &mut self, - id: I, - method: &'static str, resp: Option>, peer_id: PeerId, - get_count: F, ) -> Option> { - match &resp { - None => {} - Some(Ok((v, _))) => { - debug!( - %id, - method, - count = get_count(v), - "Sync RPC request completed" - ); - } - Some(Err(e)) => { - debug!( - %id, - method, - error = ?e, - "Sync RPC request error" - ); - } - } if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 3183c06d76..8f9540693e 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,11 @@ +use std::time::Instant; use std::{collections::hash_map::Entry, hash::Hash}; use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use tracing::Span; +use tracing::{Span, debug}; use types::{Hash256, Slot}; pub use blobs_by_range::BlobsByRangeRequestItems; @@ -18,7 +19,7 @@ pub use data_columns_by_root::{ use crate::metrics; -use super::{RpcEvent, RpcResponseResult}; +use super::{RpcEvent, RpcResponseError, RpcResponseResult}; mod blobs_by_range; mod blobs_by_root; @@ -51,6 +52,7 @@ struct ActiveRequest { peer_id: PeerId, // Error if the request terminates before receiving max expected responses expect_max_responses: bool, + start_instant: Instant, span: Span, } @@ -60,7 +62,7 @@ enum State { Errored, } -impl ActiveRequests { +impl ActiveRequests { pub fn new(name: &'static str) -> Self { Self { requests: <_>::default(), @@ -83,6 +85,7 @@ impl ActiveRequests { state: State::Active(items), peer_id, expect_max_responses, + start_instant: Instant::now(), span, }, ); @@ -112,7 +115,7 @@ impl ActiveRequests { return None; }; - match rpc_event { + let result = match rpc_event { // Handler of a success ReqResp chunk. Adds the item to the request accumulator. // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { @@ -126,7 +129,7 @@ impl ActiveRequests { Ok(true) => { let items = items.consume(); request.state = State::CompletedEarly; - Some(Ok((items, seen_timestamp))) + Some(Ok((items, seen_timestamp, request.start_instant.elapsed()))) } // Received item, but we are still expecting more Ok(false) => None, @@ -163,7 +166,11 @@ impl ActiveRequests { } .into())) } else { - Some(Ok((items.consume(), timestamp_now()))) + Some(Ok(( + items.consume(), + timestamp_now(), + request.start_instant.elapsed(), + ))) } } // Items already returned, ignore stream termination @@ -188,7 +195,41 @@ impl ActiveRequests { State::Errored => None, } } - } + }; + + result.map(|result| match result { + Ok((items, seen_timestamp, duration)) => { + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]); + metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration); + debug!( + %id, + method = self.name, + count = items.len(), + "Sync RPC request completed" + ); + + Ok((items, seen_timestamp)) + } + Err(e) => { + let err_str: &'static str = match &e { + RpcResponseError::RpcError(e) => e.into(), + RpcResponseError::VerifyError(e) => e.into(), + RpcResponseError::CustodyRequestError(_) => "CustodyRequestError", + RpcResponseError::BlockComponentCouplingError(_) => { + "BlockComponentCouplingError" + } + }; + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]); + debug!( + %id, + method = self.name, + error = ?e, + "Sync RPC request error" + ); + + Err(e) + } + }) } pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9553fe60ba..5c3e8058d9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1404,6 +1404,16 @@ pub fn cli_app() -> Command { .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("ignore-ws-check") + .long("ignore-ws-check") + .help("Using this flag allows a node to run in a state that may expose it to long-range attacks. \ + For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \ + If you understand the risks, you can use this flag to disable the Weak Subjectivity check at startup.") + .action(ArgAction::SetTrue) + .help_heading(FLAG_HEADER) + .display_order(0) + ) .arg( Arg::new("builder-fallback-skips") .long("builder-fallback-skips") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 752cf10550..e6091d9213 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -780,6 +780,8 @@ pub fn get_config( client_config.chain.paranoid_block_proposal = cli_args.get_flag("paranoid-block-proposal"); + client_config.chain.ignore_ws_check = cli_args.get_flag("ignore-ws-check"); + /* * Builder fallback configs. */ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 6db2150e5f..e33da17e26 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -22,14 +22,9 @@ use types::{ChainSpec, Epoch, EthSpec, ForkName}; pub type ProductionClient = Client, BeaconNodeBackend>>; -/// The beacon node `Client` that will be used in production. +/// The beacon node `Client` that is used in production. /// /// Generic over some `EthSpec`. -/// -/// ## Notes: -/// -/// Despite being titled `Production...`, this code is not ready for production. The name -/// demonstrates an intention, not a promise. pub struct ProductionBeaconNode(ProductionClient); impl ProductionBeaconNode { diff --git a/book/src/help_bn.md b/book/src/help_bn.md index d3aa27c8a7..beb74da376 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -509,6 +509,12 @@ Flags: --http-enable-tls Serves the RESTful HTTP API server over TLS. This feature is currently experimental. + --ignore-ws-check + Using this flag allows a node to run in a state that may expose it to + long-range attacks. For more information please read this blog post: + https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity + If you understand the risks, you can use this flag to disable the Weak + Subjectivity check at startup. --import-all-attestations Import and aggregate all attestations, regardless of validator subscriptions. This will only import attestations from diff --git a/consensus/types/src/state/beacon_state.rs b/consensus/types/src/state/beacon_state.rs index 1352ded79e..1745908c40 100644 --- a/consensus/types/src/state/beacon_state.rs +++ b/consensus/types/src/state/beacon_state.rs @@ -55,9 +55,21 @@ use crate::{ }; pub const CACHED_EPOCHS: usize = 3; + +// Pre-electra WS calculations are not supported. On mainnet, pre-electra epochs are outside the weak subjectivity +// period. The default pre-electra WS value is set to 256 to allow for `basic-sim``, `fallback-sim`` test case `revert_minority_fork_on_resume` +// to pass. 256 is a small enough number to trigger the WS safety check pre-electra on mainnet. +pub const DEFAULT_PRE_ELECTRA_WS_PERIOD: u64 = 256; + const MAX_RANDOM_BYTE: u64 = (1 << 8) - 1; const MAX_RANDOM_VALUE: u64 = (1 << 16) - 1; +// `SAFETY_DECAY` is defined as the maximum percentage tolerable loss in the one-third +// safety margin of FFG finality. Thus, any attack exploiting the Weak Subjectivity Period has +// a safety margin of at least `1/3 - SAFETY_DECAY/100`. +// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/phase0/weak-subjectivity.md?plain=1#L50-L71 +const SAFETY_DECAY: u64 = 10; + pub type Validators = List::ValidatorRegistryLimit>; pub type Balances = List::ValidatorRegistryLimit>; @@ -3007,6 +3019,26 @@ impl BeaconState { Ok(()) } + /// Returns the weak subjectivity period for `self` + pub fn compute_weak_subjectivity_period( + &self, + spec: &ChainSpec, + ) -> Result { + let total_active_balance = self.get_total_active_balance()?; + let fork_name = self.fork_name_unchecked(); + + if fork_name.electra_enabled() { + let balance_churn_limit = self.get_balance_churn_limit(spec)?; + compute_weak_subjectivity_period_electra( + total_active_balance, + balance_churn_limit, + spec, + ) + } else { + Ok(Epoch::new(DEFAULT_PRE_ELECTRA_WS_PERIOD)) + } + } + /// Get the payload timeliness committee for the given `slot`. /// /// Requires the committee cache to be initialized. @@ -3382,3 +3414,75 @@ impl<'de, E: EthSpec> ContextDeserialize<'de, ForkName> for BeaconState { )) } } + +/// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L30 +pub fn compute_weak_subjectivity_period_electra( + total_active_balance: u64, + balance_churn_limit: u64, + spec: &ChainSpec, +) -> Result { + let epochs_for_validator_set_churn = SAFETY_DECAY + .safe_mul(total_active_balance)? + .safe_div(balance_churn_limit.safe_mul(200)?)?; + let ws_period = spec + .min_validator_withdrawability_delay + .safe_add(epochs_for_validator_set_churn)?; + + Ok(ws_period) +} + +#[cfg(test)] +mod weak_subjectivity_tests { + use crate::state::beacon_state::compute_weak_subjectivity_period_electra; + use crate::{ChainSpec, Epoch, EthSpec, MainnetEthSpec}; + + const GWEI_PER_ETH: u64 = 1_000_000_000; + + #[test] + fn test_compute_weak_subjectivity_period_electra() { + let mut spec = MainnetEthSpec::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + spec.capella_fork_epoch = Some(Epoch::new(0)); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.electra_fork_epoch = Some(Epoch::new(0)); + + // A table of some expected values: + // https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L44-L54 + // (total_active_balance, expected_ws_period) + let expected_values: Vec<(u64, u64)> = vec![ + (1_048_576 * GWEI_PER_ETH, 665), + (2_097_152 * GWEI_PER_ETH, 1_075), + (4_194_304 * GWEI_PER_ETH, 1_894), + (8_388_608 * GWEI_PER_ETH, 3_532), + (16_777_216 * GWEI_PER_ETH, 3_532), + (33_554_432 * GWEI_PER_ETH, 3_532), + // This value cross referenced w/ + // beacon_chain/tests/tests.rs:test_compute_weak_subjectivity_period + (1536 * GWEI_PER_ETH, 256), + ]; + + for (total_active_balance, expected_ws_period) in expected_values { + let balance_churn_limit = get_balance_churn_limit(total_active_balance, &spec); + + let calculated_ws_period = compute_weak_subjectivity_period_electra( + total_active_balance, + balance_churn_limit, + &spec, + ) + .unwrap(); + + assert_eq!(calculated_ws_period, expected_ws_period); + } + } + + // caclulate the balance_churn_limit without dealing with states + // and without initializing the active balance cache + fn get_balance_churn_limit(total_active_balance: u64, spec: &ChainSpec) -> u64 { + let churn = std::cmp::max( + spec.min_per_epoch_churn_limit_electra, + total_active_balance / spec.churn_limit_quotient, + ); + churn - (churn % spec.effective_balance_increment) + } +} diff --git a/consensus/types/src/state/mod.rs b/consensus/types/src/state/mod.rs index 309796d359..ea064fb7ac 100644 --- a/consensus/types/src/state/mod.rs +++ b/consensus/types/src/state/mod.rs @@ -17,7 +17,7 @@ pub use balance::Balance; pub use beacon_state::{ BeaconState, BeaconStateAltair, BeaconStateBase, BeaconStateBellatrix, BeaconStateCapella, BeaconStateDeneb, BeaconStateElectra, BeaconStateError, BeaconStateFulu, BeaconStateGloas, - BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, + BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, DEFAULT_PRE_ELECTRA_WS_PERIOD, }; pub use committee_cache::{ CommitteeCache, compute_committee_index_in_epoch, compute_committee_range_in_epoch, diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a2fad31f65..322787736b 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -295,6 +295,21 @@ fn paranoid_block_proposal_on() { .with_config(|config| assert!(config.chain.paranoid_block_proposal)); } +#[test] +fn ignore_ws_check_enabled() { + CommandLineTest::new() + .flag("ignore-ws-check", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.ignore_ws_check)); +} + +#[test] +fn ignore_ws_check_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.ignore_ws_check)); +} + #[test] fn reset_payload_statuses_default() { CommandLineTest::new()