Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-block-and-bid-production

This commit is contained in:
Eitan Seri- Levi
2026-02-11 14:53:39 -08:00
20 changed files with 583 additions and 65 deletions

View File

@@ -41,7 +41,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{ShutdownReason, TaskExecutor};
use tracing::{debug, error, info}; use tracing::{debug, error, info, warn};
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::data::CustodyIndex; use types::data::CustodyIndex;
use types::{ use types::{
@@ -848,6 +848,33 @@ where
)); ));
} }
// Check if the head snapshot is within the weak subjectivity period
let head_state = &head_snapshot.beacon_state;
let Ok(ws_period) = head_state.compute_weak_subjectivity_period(&self.spec) else {
return Err(format!(
"Unable to compute the weak subjectivity period at the head snapshot slot: {:?}",
head_state.slot()
));
};
if current_slot.epoch(E::slots_per_epoch())
> head_state.slot().epoch(E::slots_per_epoch()) + ws_period
{
if self.chain_config.ignore_ws_check {
warn!(
head_slot=%head_state.slot(),
%current_slot,
"The current head state is outside the weak subjectivity period. You are currently running a node that is susceptible to long range attacks. \
It is highly recommended to purge your db and checkpoint sync. For more information please \
read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity"
)
}
return Err(
"The current head state is outside the weak subjectivity period. A node in this state is susceptible to long range attacks. You should purge your db and \
checkpoint sync. For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \
If you understand the risks, it is possible to ignore this error with the --ignore-ws-check flag.".to_string()
);
}
let validator_pubkey_cache = self let validator_pubkey_cache = self
.validator_pubkey_cache .validator_pubkey_cache
.map(|mut validator_pubkey_cache| { .map(|mut validator_pubkey_cache| {

View File

@@ -117,6 +117,8 @@ pub struct ChainConfig {
/// On Holesky there is a block which is added to this set by default but which can be removed /// On Holesky there is a block which is added to this set by default but which can be removed
/// by using `--invalid-block-roots ""`. /// by using `--invalid-block-roots ""`.
pub invalid_block_roots: HashSet<Hash256>, pub invalid_block_roots: HashSet<Hash256>,
/// When set to true, the beacon node can be started even if the head state is outside the weak subjectivity period.
pub ignore_ws_check: bool,
/// Disable the getBlobs optimisation to fetch blobs from the EL mempool. /// Disable the getBlobs optimisation to fetch blobs from the EL mempool.
pub disable_get_blobs: bool, pub disable_get_blobs: bool,
/// The node's custody type, determining how many data columns to custody and sample. /// The node's custody type, determining how many data columns to custody and sample.
@@ -160,6 +162,7 @@ impl Default for ChainConfig {
block_publishing_delay: None, block_publishing_delay: None,
data_column_publishing_delay: None, data_column_publishing_delay: None,
invalid_block_roots: HashSet::new(), invalid_block_roots: HashSet::new(),
ignore_ws_check: false,
disable_get_blobs: false, disable_get_blobs: false,
node_custody_type: NodeCustodyType::Fullnode, node_custody_type: NodeCustodyType::Fullnode,
} }

View File

@@ -6,7 +6,7 @@ use beacon_chain::{
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, NotifyExecutionLayer, OverrideForkchoiceUpdate, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, NotifyExecutionLayer, OverrideForkchoiceUpdate,
StateSkipConfig, WhenSlotSkipped, StateSkipConfig, WhenSlotSkipped,
canonical_head::{CachedHead, CanonicalHead}, canonical_head::{CachedHead, CanonicalHead},
test_utils::{BeaconChainHarness, EphemeralHarnessType}, test_utils::{BeaconChainHarness, EphemeralHarnessType, test_spec},
}; };
use execution_layer::{ use execution_layer::{
ExecutionLayer, ForkchoiceState, PayloadAttributes, ExecutionLayer, ForkchoiceState, PayloadAttributes,
@@ -42,14 +42,11 @@ struct InvalidPayloadRig {
impl InvalidPayloadRig { impl InvalidPayloadRig {
fn new() -> Self { fn new() -> Self {
let spec = E::default_spec(); let spec = test_spec::<E>();
Self::new_with_spec(spec) Self::new_with_spec(spec)
} }
fn new_with_spec(mut spec: ChainSpec) -> Self { fn new_with_spec(spec: ChainSpec) -> Self {
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
let harness = BeaconChainHarness::builder(MainnetEthSpec) let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.into()) .spec(spec.into())
.chain_config(ChainConfig { .chain_config(ChainConfig {

View File

@@ -117,6 +117,7 @@ fn get_harness_import_all_data_columns(
) -> TestHarness { ) -> TestHarness {
// Most tests expect to retain historic states, so we use this as the default. // Most tests expect to retain historic states, so we use this as the default.
let chain_config = ChainConfig { let chain_config = ChainConfig {
ignore_ws_check: true,
reconstruct_historic_states: true, reconstruct_historic_states: true,
..ChainConfig::default() ..ChainConfig::default()
}; };

View File

@@ -15,7 +15,8 @@ use state_processing::EpochProcessingError;
use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError}; use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError};
use std::sync::LazyLock; use std::sync::LazyLock;
use types::{ use types::{
BeaconState, BeaconStateError, BlockImportSource, Checkpoint, EthSpec, Hash256, MinimalEthSpec, BeaconState, BeaconStateError, BlockImportSource, ChainSpec, Checkpoint,
DEFAULT_PRE_ELECTRA_WS_PERIOD, EthSpec, ForkName, Hash256, MainnetEthSpec, MinimalEthSpec,
RelativeEpoch, Slot, RelativeEpoch, Slot,
}; };
@@ -38,6 +39,27 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessTyp
) )
} }
fn get_harness_with_spec(
validator_count: usize,
spec: &ChainSpec,
) -> BeaconChainHarness<EphemeralHarnessType<MainnetEthSpec>> {
let chain_config = ChainConfig {
reconstruct_historic_states: true,
..Default::default()
};
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone().into())
.chain_config(chain_config)
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
harness.advance_slot();
harness
}
fn get_harness_with_config( fn get_harness_with_config(
validator_count: usize, validator_count: usize,
chain_config: ChainConfig, chain_config: ChainConfig,
@@ -1083,3 +1105,28 @@ async fn pseudo_finalize_with_lagging_split_update() {
let expect_true_migration = false; let expect_true_migration = false;
pseudo_finalize_test_generic(epochs_per_migration, expect_true_migration).await; pseudo_finalize_test_generic(epochs_per_migration, expect_true_migration).await;
} }
#[tokio::test]
async fn test_compute_weak_subjectivity_period() {
type E = MainnetEthSpec;
let expected_ws_period_pre_electra = DEFAULT_PRE_ELECTRA_WS_PERIOD;
let expected_ws_period_post_electra = 256;
// test Base variant
let spec = ForkName::Altair.make_genesis_spec(E::default_spec());
let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec);
let head_state = harness.get_current_state();
let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap();
assert_eq!(calculated_ws_period, expected_ws_period_pre_electra);
// test Electra variant
let spec = ForkName::Electra.make_genesis_spec(E::default_spec());
let harness = get_harness_with_spec(VALIDATOR_COUNT, &spec);
let head_state = harness.get_current_state();
let calculated_ws_period = head_state.compute_weak_subjectivity_period(&spec).unwrap();
assert_eq!(calculated_ws_period, expected_ws_period_post_electra);
}

View File

@@ -13,7 +13,8 @@ use futures::prelude::*;
use libp2p::PeerId; use libp2p::PeerId;
use libp2p::swarm::handler::{ use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError,
SubstreamProtocol,
}; };
use libp2p::swarm::{ConnectionId, Stream}; use libp2p::swarm::{ConnectionId, Stream};
use logging::crit; use logging::crit;
@@ -888,6 +889,16 @@ where
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error) self.on_dial_upgrade_error(info, error)
} }
ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
error: (proto, error),
..
}) => {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto,
error,
}));
}
_ => { _ => {
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback // release notes more than compiler feedback
@@ -924,7 +935,7 @@ where
request.count() request.count()
)), )),
})); }));
return self.shutdown(None); return;
} }
} }
RequestType::BlobsByRange(request) => { RequestType::BlobsByRange(request) => {
@@ -940,7 +951,7 @@ where
max_allowed, max_requested_blobs max_allowed, max_requested_blobs
)), )),
})); }));
return self.shutdown(None); return;
} }
} }
_ => {} _ => {}

View File

@@ -675,7 +675,7 @@ where
E: EthSpec, E: EthSpec,
{ {
type Output = InboundOutput<TSocket, E>; type Output = InboundOutput<TSocket, E>;
type Error = RPCError; type Error = (Protocol, RPCError);
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
@@ -717,10 +717,12 @@ where
) )
.await .await
{ {
Err(e) => Err(RPCError::from(e)), Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))),
Ok((Some(Ok(request)), stream)) => Ok((request, stream)), Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
Ok((Some(Err(e)), _)) => Err(e), Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)),
Ok((None, _)) => Err(RPCError::IncompleteStream), Ok((None, _)) => {
Err((versioned_protocol.protocol(), RPCError::IncompleteStream))
}
} }
} }
} }

View File

@@ -77,6 +77,14 @@ impl Quota {
max_tokens: n, max_tokens: n,
} }
} }
#[cfg(test)]
pub const fn n_every_millis(n: NonZeroU64, millis: u64) -> Self {
Quota {
replenish_all_every: Duration::from_millis(millis),
max_tokens: n,
}
}
} }
/// Manages rate limiting of requests per peer, with differentiated rates per protocol. /// Manages rate limiting of requests per peer, with differentiated rates per protocol.

View File

@@ -4,6 +4,10 @@ use super::{
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
}; };
use crate::rpc::rate_limiter::RateLimiterItem; use crate::rpc::rate_limiter::RateLimiterItem;
use futures::FutureExt;
use libp2p::{PeerId, swarm::NotifyHandler};
use logging::crit;
use smallvec::SmallVec;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{ use std::{
collections::{HashMap, VecDeque, hash_map::Entry}, collections::{HashMap, VecDeque, hash_map::Entry},
@@ -11,11 +15,6 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
}; };
use futures::FutureExt;
use libp2p::{PeerId, swarm::NotifyHandler};
use logging::crit;
use smallvec::SmallVec;
use tokio_util::time::DelayQueue; use tokio_util::time::DelayQueue;
use tracing::debug; use tracing::debug;
use types::{EthSpec, ForkContext}; use types::{EthSpec, ForkContext};
@@ -234,9 +233,29 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
self.active_requests.remove(&peer_id); self.active_requests.remove(&peer_id);
let mut failed_requests = Vec::new();
self.ready_requests.retain(|(req_peer_id, rpc_send, _)| {
if let RPCSend::Request(request_id, req) = rpc_send {
if req_peer_id == &peer_id {
failed_requests.push((*request_id, req.protocol()));
// Remove the entry
false
} else {
// Keep the entry
true
}
} else {
debug_assert!(
false,
"Coding error: unexpected RPCSend variant {rpc_send:?}."
);
false
}
});
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity // should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
self.delayed_requests self.delayed_requests
.retain(|(map_peer_id, protocol), queue| { .retain(|(map_peer_id, protocol), queue| {
if map_peer_id == &peer_id { if map_peer_id == &peer_id {
@@ -252,6 +271,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
true true
} }
}); });
failed_requests failed_requests
} }
@@ -549,4 +569,72 @@ mod tests {
.contains_key(&(peer2, Protocol::Ping)) .contains_key(&(peer2, Protocol::Ping))
); );
} }
/// Test that `peer_disconnected` returns the IDs of pending requests.
#[tokio::test]
async fn test_peer_disconnected_returns_failed_requests() {
const REPLENISH_DURATION: u64 = 50;
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every_millis(NonZeroU64::new(1).unwrap(), REPLENISH_DURATION),
..Default::default()
});
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(Some(config), fork_context).unwrap();
let peer_id = PeerId::random();
for i in 1..=5u32 {
let result = limiter.allows(
peer_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
req_id: i,
lookup_id: i,
},
}),
RequestType::Ping(Ping { data: i as u64 }),
);
// Check that the limiter allows the first request while other requests are added to the queue.
if i == 1 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
// Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
tokio::time::sleep(Duration::from_millis(REPLENISH_DURATION + 10)).await;
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
// Check that one of the pending requests has moved to ready_requests.
assert_eq!(
limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap()
.len(),
3
);
assert_eq!(limiter.ready_requests.len(), 1);
let mut failed_requests = limiter.peer_disconnected(peer_id);
// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
assert_eq!(failed_requests.len(), 4);
for i in 2..=5u32 {
let (request_id, protocol) = failed_requests.remove(0);
assert!(matches!(
request_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
}) if req_id == i
));
assert_eq!(protocol, Protocol::Ping);
}
}
} }

View File

@@ -5,8 +5,12 @@ use crate::common::spec_with_all_forks_enabled;
use crate::common::{Protocol, build_tracing_subscriber}; use crate::common::{Protocol, build_tracing_subscriber};
use bls::Signature; use bls::Signature;
use fixed_bytes::FixedBytesExtended; use fixed_bytes::FixedBytesExtended;
use libp2p::PeerId;
use lighthouse_network::rpc::{RequestType, methods::*}; use lighthouse_network::rpc::{RequestType, methods::*};
use lighthouse_network::service::api_types::AppRequestId; use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId,
};
use lighthouse_network::{NetworkEvent, ReportSource, Response}; use lighthouse_network::{NetworkEvent, ReportSource, Response};
use ssz::Encode; use ssz::Encode;
use ssz_types::{RuntimeVariableList, VariableList}; use ssz_types::{RuntimeVariableList, VariableList};
@@ -1783,3 +1787,157 @@ fn test_active_requests() {
} }
}) })
} }
// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count,
// it bans the sender.
#[test]
fn test_request_too_large_blocks_by_range() {
let spec = Arc::new(spec_with_all_forks_enabled());
test_request_too_large(
AppRequestId::Sync(SyncRequestId::BlocksByRange(BlocksByRangeRequestId {
id: 1,
parent_request_id: ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
})),
RequestType::BlocksByRange(OldBlocksByRangeRequest::new(
0,
spec.max_request_blocks(ForkName::Base) as u64 + 1, // exceeds the max request defined in the spec.
1,
)),
);
}
// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count,
// it bans the sender.
#[test]
fn test_request_too_large_blobs_by_range() {
let spec = Arc::new(spec_with_all_forks_enabled());
let max_request_blobs_count = spec.max_request_blob_sidecars(ForkName::Base) as u64
/ spec.max_blobs_per_block_within_fork(ForkName::Base);
test_request_too_large(
AppRequestId::Sync(SyncRequestId::BlobsByRange(BlobsByRangeRequestId {
id: 1,
parent_request_id: ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
})),
RequestType::BlobsByRange(BlobsByRangeRequest {
start_slot: 0,
count: max_request_blobs_count + 1, // exceeds the max request defined in the spec.
}),
);
}
// Test that when a node receives an invalid DataColumnsByRange request exceeding the columns count,
// it bans the sender.
#[test]
fn test_request_too_large_data_columns_by_range() {
test_request_too_large(
AppRequestId::Sync(SyncRequestId::DataColumnsByRange(
DataColumnsByRangeRequestId {
id: 1,
parent_request_id: DataColumnsByRangeRequester::ComponentsByRange(
ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
),
peer: PeerId::random(),
},
)),
RequestType::DataColumnsByRange(DataColumnsByRangeRequest {
start_slot: 0,
count: 0,
// exceeds the max request defined in the spec.
columns: vec![0; E::number_of_columns() + 1],
}),
);
}
fn test_request_too_large(app_request_id: AppRequestId, request: RequestType<E>) {
// Set up the logging.
let log_level = "debug";
let enable_logging = true;
let _subscriber = build_tracing_subscriber(log_level, enable_logging);
let rt = Arc::new(Runtime::new().unwrap());
let spec = Arc::new(spec_with_all_forks_enabled());
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
Protocol::Tcp,
false,
None,
)
.await;
// Build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
debug!(?request, %peer_id, "Sending RPC request");
sender
.send_request(peer_id, app_request_id, request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
app_request_id,
response,
..
} => {
debug!(?app_request_id, ?response, "Received response");
}
NetworkEvent::RPCFailed { error, .. } => {
// This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit.
debug!(?error, "RPC failed");
unreachable!();
}
NetworkEvent::PeerDisconnected(peer_id) => {
// The receiver should disconnect as a result of the invalid request.
debug!(%peer_id, "Peer disconnected");
// End the test.
return;
}
_ => {}
}
}
}
.instrument(info_span!("Sender"));
// Build the receiver future
let receiver_future = async {
loop {
if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await {
// This event should be unreachable, as the handler drops the invalid request.
unreachable!();
}
}
}
.instrument(info_span!("Receiver"));
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
});
}

View File

@@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock<Result<IntCounterVec>> = Lazy
&["type"], &["type"],
) )
}); });
pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_success_total",
"Total count of sync RPC requests successes",
&["protocol"],
)
});
pub static SYNC_RPC_REQUEST_ERRORS: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_error_total",
"Total count of sync RPC requests errors",
&["protocol", "error"],
)
});
pub static SYNC_RPC_REQUEST_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"sync_rpc_request_duration_sec",
"Time to complete a successful sync RPC requesst",
Ok(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0,
]),
&["protocol"],
)
});
/* /*
* Block Delay Metrics * Block Delay Metrics

View File

@@ -1430,7 +1430,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
}) })
}); });
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) self.on_rpc_response_result(resp, peer_id)
} }
pub(crate) fn on_single_blob_response( pub(crate) fn on_single_blob_response(
@@ -1459,7 +1459,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
}) })
}); });
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) self.on_rpc_response_result(resp, peer_id)
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -1472,7 +1472,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self let resp = self
.data_columns_by_root_requests .data_columns_by_root_requests
.on_response(id, rpc_event); .on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) self.on_rpc_response_result(resp, peer_id)
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -1483,7 +1483,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>, rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> { ) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
let resp = self.blocks_by_range_requests.on_response(id, rpc_event); let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) self.on_rpc_response_result(resp, peer_id)
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -1494,7 +1494,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>, rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> { ) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
let resp = self.blobs_by_range_requests.on_response(id, rpc_event); let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) self.on_rpc_response_result(resp, peer_id)
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -1507,36 +1507,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self let resp = self
.data_columns_by_range_requests .data_columns_by_range_requests
.on_response(id, rpc_event); .on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) self.on_rpc_response_result(resp, peer_id)
} }
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>( /// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self, &mut self,
id: I,
method: &'static str,
resp: Option<RpcResponseResult<R>>, resp: Option<RpcResponseResult<R>>,
peer_id: PeerId, peer_id: PeerId,
get_count: F,
) -> Option<RpcResponseResult<R>> { ) -> Option<RpcResponseResult<R>> {
match &resp {
None => {}
Some(Ok((v, _))) => {
debug!(
%id,
method,
count = get_count(v),
"Sync RPC request completed"
);
}
Some(Err(e)) => {
debug!(
%id,
method,
error = ?e,
"Sync RPC request error"
);
}
}
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
} }

View File

@@ -1,10 +1,11 @@
use std::time::Instant;
use std::{collections::hash_map::Entry, hash::Hash}; use std::{collections::hash_map::Entry, hash::Hash};
use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use tracing::Span; use tracing::{Span, debug};
use types::{Hash256, Slot}; use types::{Hash256, Slot};
pub use blobs_by_range::BlobsByRangeRequestItems; pub use blobs_by_range::BlobsByRangeRequestItems;
@@ -18,7 +19,7 @@ pub use data_columns_by_root::{
use crate::metrics; use crate::metrics;
use super::{RpcEvent, RpcResponseResult}; use super::{RpcEvent, RpcResponseError, RpcResponseResult};
mod blobs_by_range; mod blobs_by_range;
mod blobs_by_root; mod blobs_by_root;
@@ -51,6 +52,7 @@ struct ActiveRequest<T: ActiveRequestItems> {
peer_id: PeerId, peer_id: PeerId,
// Error if the request terminates before receiving max expected responses // Error if the request terminates before receiving max expected responses
expect_max_responses: bool, expect_max_responses: bool,
start_instant: Instant,
span: Span, span: Span,
} }
@@ -60,7 +62,7 @@ enum State<T> {
Errored, Errored,
} }
impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> { impl<K: Copy + Eq + Hash + std::fmt::Display, T: ActiveRequestItems> ActiveRequests<K, T> {
pub fn new(name: &'static str) -> Self { pub fn new(name: &'static str) -> Self {
Self { Self {
requests: <_>::default(), requests: <_>::default(),
@@ -83,6 +85,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
state: State::Active(items), state: State::Active(items),
peer_id, peer_id,
expect_max_responses, expect_max_responses,
start_instant: Instant::now(),
span, span,
}, },
); );
@@ -112,7 +115,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
return None; return None;
}; };
match rpc_event { let result = match rpc_event {
// Handler of a success ReqResp chunk. Adds the item to the request accumulator. // Handler of a success ReqResp chunk. Adds the item to the request accumulator.
// `ActiveRequestItems` validates the item before appending to its internal state. // `ActiveRequestItems` validates the item before appending to its internal state.
RpcEvent::Response(item, seen_timestamp) => { RpcEvent::Response(item, seen_timestamp) => {
@@ -126,7 +129,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
Ok(true) => { Ok(true) => {
let items = items.consume(); let items = items.consume();
request.state = State::CompletedEarly; request.state = State::CompletedEarly;
Some(Ok((items, seen_timestamp))) Some(Ok((items, seen_timestamp, request.start_instant.elapsed())))
} }
// Received item, but we are still expecting more // Received item, but we are still expecting more
Ok(false) => None, Ok(false) => None,
@@ -163,7 +166,11 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
} }
.into())) .into()))
} else { } else {
Some(Ok((items.consume(), timestamp_now()))) Some(Ok((
items.consume(),
timestamp_now(),
request.start_instant.elapsed(),
)))
} }
} }
// Items already returned, ignore stream termination // Items already returned, ignore stream termination
@@ -188,7 +195,41 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
State::Errored => None, State::Errored => None,
} }
} }
};
result.map(|result| match result {
Ok((items, seen_timestamp, duration)) => {
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]);
metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration);
debug!(
%id,
method = self.name,
count = items.len(),
"Sync RPC request completed"
);
Ok((items, seen_timestamp))
} }
Err(e) => {
let err_str: &'static str = match &e {
RpcResponseError::RpcError(e) => e.into(),
RpcResponseError::VerifyError(e) => e.into(),
RpcResponseError::CustodyRequestError(_) => "CustodyRequestError",
RpcResponseError::BlockComponentCouplingError(_) => {
"BlockComponentCouplingError"
}
};
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]);
debug!(
%id,
method = self.name,
error = ?e,
"Sync RPC request error"
);
Err(e)
}
})
} }
pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> { pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {

View File

@@ -1404,6 +1404,16 @@ pub fn cli_app() -> Command {
.help_heading(FLAG_HEADER) .help_heading(FLAG_HEADER)
.display_order(0) .display_order(0)
) )
.arg(
Arg::new("ignore-ws-check")
.long("ignore-ws-check")
.help("Using this flag allows a node to run in a state that may expose it to long-range attacks. \
For more information please read this blog post: https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity \
If you understand the risks, you can use this flag to disable the Weak Subjectivity check at startup.")
.action(ArgAction::SetTrue)
.help_heading(FLAG_HEADER)
.display_order(0)
)
.arg( .arg(
Arg::new("builder-fallback-skips") Arg::new("builder-fallback-skips")
.long("builder-fallback-skips") .long("builder-fallback-skips")

View File

@@ -780,6 +780,8 @@ pub fn get_config<E: EthSpec>(
client_config.chain.paranoid_block_proposal = cli_args.get_flag("paranoid-block-proposal"); client_config.chain.paranoid_block_proposal = cli_args.get_flag("paranoid-block-proposal");
client_config.chain.ignore_ws_check = cli_args.get_flag("ignore-ws-check");
/* /*
* Builder fallback configs. * Builder fallback configs.
*/ */

View File

@@ -22,14 +22,9 @@ use types::{ChainSpec, Epoch, EthSpec, ForkName};
pub type ProductionClient<E> = pub type ProductionClient<E> =
Client<Witness<SystemTimeSlotClock, E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>; Client<Witness<SystemTimeSlotClock, E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>;
/// The beacon node `Client` that will be used in production. /// The beacon node `Client` that is used in production.
/// ///
/// Generic over some `EthSpec`. /// Generic over some `EthSpec`.
///
/// ## Notes:
///
/// Despite being titled `Production...`, this code is not ready for production. The name
/// demonstrates an intention, not a promise.
pub struct ProductionBeaconNode<E: EthSpec>(ProductionClient<E>); pub struct ProductionBeaconNode<E: EthSpec>(ProductionClient<E>);
impl<E: EthSpec> ProductionBeaconNode<E> { impl<E: EthSpec> ProductionBeaconNode<E> {

View File

@@ -509,6 +509,12 @@ Flags:
--http-enable-tls --http-enable-tls
Serves the RESTful HTTP API server over TLS. This feature is currently Serves the RESTful HTTP API server over TLS. This feature is currently
experimental. experimental.
--ignore-ws-check
Using this flag allows a node to run in a state that may expose it to
long-range attacks. For more information please read this blog post:
https://blog.ethereum.org/2014/11/25/proof-stake-learned-love-weak-subjectivity
If you understand the risks, you can use this flag to disable the Weak
Subjectivity check at startup.
--import-all-attestations --import-all-attestations
Import and aggregate all attestations, regardless of validator Import and aggregate all attestations, regardless of validator
subscriptions. This will only import attestations from subscriptions. This will only import attestations from

View File

@@ -55,9 +55,21 @@ use crate::{
}; };
pub const CACHED_EPOCHS: usize = 3; pub const CACHED_EPOCHS: usize = 3;
// Pre-electra WS calculations are not supported. On mainnet, pre-electra epochs are outside the weak subjectivity
// period. The default pre-electra WS value is set to 256 to allow for `basic-sim``, `fallback-sim`` test case `revert_minority_fork_on_resume`
// to pass. 256 is a small enough number to trigger the WS safety check pre-electra on mainnet.
pub const DEFAULT_PRE_ELECTRA_WS_PERIOD: u64 = 256;
const MAX_RANDOM_BYTE: u64 = (1 << 8) - 1; const MAX_RANDOM_BYTE: u64 = (1 << 8) - 1;
const MAX_RANDOM_VALUE: u64 = (1 << 16) - 1; const MAX_RANDOM_VALUE: u64 = (1 << 16) - 1;
// `SAFETY_DECAY` is defined as the maximum percentage tolerable loss in the one-third
// safety margin of FFG finality. Thus, any attack exploiting the Weak Subjectivity Period has
// a safety margin of at least `1/3 - SAFETY_DECAY/100`.
// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/phase0/weak-subjectivity.md?plain=1#L50-L71
const SAFETY_DECAY: u64 = 10;
pub type Validators<E> = List<Validator, <E as EthSpec>::ValidatorRegistryLimit>; pub type Validators<E> = List<Validator, <E as EthSpec>::ValidatorRegistryLimit>;
pub type Balances<E> = List<u64, <E as EthSpec>::ValidatorRegistryLimit>; pub type Balances<E> = List<u64, <E as EthSpec>::ValidatorRegistryLimit>;
@@ -3007,6 +3019,26 @@ impl<E: EthSpec> BeaconState<E> {
Ok(()) Ok(())
} }
/// Returns the weak subjectivity period for `self`
pub fn compute_weak_subjectivity_period(
&self,
spec: &ChainSpec,
) -> Result<Epoch, BeaconStateError> {
let total_active_balance = self.get_total_active_balance()?;
let fork_name = self.fork_name_unchecked();
if fork_name.electra_enabled() {
let balance_churn_limit = self.get_balance_churn_limit(spec)?;
compute_weak_subjectivity_period_electra(
total_active_balance,
balance_churn_limit,
spec,
)
} else {
Ok(Epoch::new(DEFAULT_PRE_ELECTRA_WS_PERIOD))
}
}
/// Get the payload timeliness committee for the given `slot`. /// Get the payload timeliness committee for the given `slot`.
/// ///
/// Requires the committee cache to be initialized. /// Requires the committee cache to be initialized.
@@ -3382,3 +3414,75 @@ impl<'de, E: EthSpec> ContextDeserialize<'de, ForkName> for BeaconState<E> {
)) ))
} }
} }
/// Spec: https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L30
pub fn compute_weak_subjectivity_period_electra(
total_active_balance: u64,
balance_churn_limit: u64,
spec: &ChainSpec,
) -> Result<Epoch, BeaconStateError> {
let epochs_for_validator_set_churn = SAFETY_DECAY
.safe_mul(total_active_balance)?
.safe_div(balance_churn_limit.safe_mul(200)?)?;
let ws_period = spec
.min_validator_withdrawability_delay
.safe_add(epochs_for_validator_set_churn)?;
Ok(ws_period)
}
#[cfg(test)]
mod weak_subjectivity_tests {
use crate::state::beacon_state::compute_weak_subjectivity_period_electra;
use crate::{ChainSpec, Epoch, EthSpec, MainnetEthSpec};
const GWEI_PER_ETH: u64 = 1_000_000_000;
#[test]
fn test_compute_weak_subjectivity_period_electra() {
let mut spec = MainnetEthSpec::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
spec.capella_fork_epoch = Some(Epoch::new(0));
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.electra_fork_epoch = Some(Epoch::new(0));
// A table of some expected values:
// https://github.com/ethereum/consensus-specs/blob/1937aff86b41b5171a9bc3972515986f1bbbf303/specs/electra/weak-subjectivity.md?plain=1#L44-L54
// (total_active_balance, expected_ws_period)
let expected_values: Vec<(u64, u64)> = vec![
(1_048_576 * GWEI_PER_ETH, 665),
(2_097_152 * GWEI_PER_ETH, 1_075),
(4_194_304 * GWEI_PER_ETH, 1_894),
(8_388_608 * GWEI_PER_ETH, 3_532),
(16_777_216 * GWEI_PER_ETH, 3_532),
(33_554_432 * GWEI_PER_ETH, 3_532),
// This value cross referenced w/
// beacon_chain/tests/tests.rs:test_compute_weak_subjectivity_period
(1536 * GWEI_PER_ETH, 256),
];
for (total_active_balance, expected_ws_period) in expected_values {
let balance_churn_limit = get_balance_churn_limit(total_active_balance, &spec);
let calculated_ws_period = compute_weak_subjectivity_period_electra(
total_active_balance,
balance_churn_limit,
&spec,
)
.unwrap();
assert_eq!(calculated_ws_period, expected_ws_period);
}
}
// caclulate the balance_churn_limit without dealing with states
// and without initializing the active balance cache
fn get_balance_churn_limit(total_active_balance: u64, spec: &ChainSpec) -> u64 {
let churn = std::cmp::max(
spec.min_per_epoch_churn_limit_electra,
total_active_balance / spec.churn_limit_quotient,
);
churn - (churn % spec.effective_balance_increment)
}
}

View File

@@ -17,7 +17,7 @@ pub use balance::Balance;
pub use beacon_state::{ pub use beacon_state::{
BeaconState, BeaconStateAltair, BeaconStateBase, BeaconStateBellatrix, BeaconStateCapella, BeaconState, BeaconStateAltair, BeaconStateBase, BeaconStateBellatrix, BeaconStateCapella,
BeaconStateDeneb, BeaconStateElectra, BeaconStateError, BeaconStateFulu, BeaconStateGloas, BeaconStateDeneb, BeaconStateElectra, BeaconStateError, BeaconStateFulu, BeaconStateGloas,
BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, BeaconStateHash, BeaconStateRef, CACHED_EPOCHS, DEFAULT_PRE_ELECTRA_WS_PERIOD,
}; };
pub use committee_cache::{ pub use committee_cache::{
CommitteeCache, compute_committee_index_in_epoch, compute_committee_range_in_epoch, CommitteeCache, compute_committee_index_in_epoch, compute_committee_range_in_epoch,

View File

@@ -295,6 +295,21 @@ fn paranoid_block_proposal_on() {
.with_config(|config| assert!(config.chain.paranoid_block_proposal)); .with_config(|config| assert!(config.chain.paranoid_block_proposal));
} }
#[test]
fn ignore_ws_check_enabled() {
CommandLineTest::new()
.flag("ignore-ws-check", None)
.run_with_zero_port()
.with_config(|config| assert!(config.chain.ignore_ws_check));
}
#[test]
fn ignore_ws_check_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(!config.chain.ignore_ws_check));
}
#[test] #[test]
fn reset_payload_statuses_default() { fn reset_payload_statuses_default() {
CommandLineTest::new() CommandLineTest::new()