Compare commits

...

10 Commits

Author SHA1 Message Date
Jimmy Chen
95f12d0927 Bump version to v8.1.1 (#8853) 2026-02-27 16:48:56 +11:00
Jimmy Chen
8cf6ffac4b Update yanked keccak 0.1.5 to 0.1.6 (#8900)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
2026-02-27 16:22:19 +11:00
Jimmy Chen
4588971085 Add sync batch state metrics (#8847)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
2026-02-19 01:57:53 +00:00
Pawan Dhananjay
561898fc1c Process head_chains in descending order of number of peers (#8859)
N/A


  Another find by @gitToki. Sort the preferred_ids in descending order as originally intended from the comment in the function.


Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
2026-02-19 00:38:56 +00:00
Michael Sproul
be799cb2ad Validator client head monitor timeout fix (#8846)
Fix a bug in v8.1.0 whereby the VC times out continuously with:

> Feb 18 02:03:48.030 WARN  Head service failed retrying starting next slot  error: "Head monitoring stream error, node: 0, error: SseClient(Transport(reqwest::Error { kind: Decode, source: reqwest::Error { kind: Body, source: TimedOut } }))"


  - Remove the existing timeout for the events API by using `Duration::MAX`. This is necessary as the client is configured with a default timeout. This is the only way to override/remove it.
- DO NOT add a `read_timeout` (yet), as this would need to be configured on a per-client basis. We do not want to create a new Client for every call as the early commits on this branch were doing, as this would bypass the TLS cert config, and is also wasteful.


Co-Authored-By: hopinheimer <knmanas6@gmail.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>
2026-02-18 05:28:17 +00:00
Pawan Dhananjay
c5b4580e37 Return correct variant for snappy errors (#8841)
N/A


  Handle snappy crate errors as InvalidData instead of IoError.


Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
2026-02-18 04:17:07 +00:00
Jimmy Chen
691c8cf8e6 Fix duplicate data columns in DataColumnsByRange responses (#8843)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
2026-02-18 04:16:57 +00:00
Akihito Nakano
c61665b3a1 Penalize peers that send an invalid rpc request (#6986)
Since https://github.com/sigp/lighthouse/pull/6847, invalid `BlocksByRange`/`BlobsByRange` requests, which do not comply with the spec, are [handled in the Handler](3d16d1080f/beacon_node/lighthouse_network/src/rpc/handler.rs (L880-L911)). Any peer that sends an invalid request is penalized and disconnected.

However, other kinds of invalid rpc request, which result in decoding errors, are just dropped. No penalty is applied and the connection with the peer remains.


  I have added handling for the `ListenUpgradeError` event to notify the application of an `RPCError:InvalidData` error and disconnect to the peer that sent the invalid rpc request.

I also added tests for handling invalid rpc requests.


Co-Authored-By: ackintosh <sora.akatsuki@gmail.com>
2026-02-18 14:43:59 +11:00
Jimmy Chen
d4ec006a34 Update time to fix cargo audit failure (#8764) 2026-02-18 14:03:04 +11:00
0xMushow
9065e4a56e fix(beacon_node): add pruning of observed_column_sidecars (#8531)
None


  I noticed that `observed_column_sidecars` is missing its prune call in the finalization handler, which results in a memory leak on long-running nodes (very slow (**7MB/day**)) :

13dfa9200f/beacon_node/beacon_chain/src/canonical_head.rs (L940-L959)

Both caches use the same generic type `ObservedDataSidecars<T>:`
22ec4b3271/beacon_node/beacon_chain/src/beacon_chain.rs (L413-L416)

The type's documentation explicitly requires manual pruning:

>  "*The cache supports pruning based upon the finalized epoch. It does not automatically prune, you must call Self::prune manually.*"


b4704eab4a/beacon_node/beacon_chain/src/observed_data_sidecars.rs (L66-L74)

Currently:
- `observed_blob_sidecars` => pruned
- `observed_column_sidecars` => **NOT** pruned

Without pruning, the underlying HashMap accumulates entries indefinitely, causing continuous memory growth until the node restarts.


Co-Authored-By: Antoine James <antoine@ethereum.org>
2026-02-18 12:38:05 +11:00
18 changed files with 484 additions and 42 deletions

39
Cargo.lock generated
View File

@@ -4,7 +4,7 @@ version = 4
[[package]]
name = "account_manager"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"account_utils",
"bls",
@@ -1276,7 +1276,7 @@ dependencies = [
[[package]]
name = "beacon_node"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"account_utils",
"beacon_chain",
@@ -1513,7 +1513,7 @@ dependencies = [
[[package]]
name = "boot_node"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"beacon_node",
"bytes",
@@ -4832,9 +4832,9 @@ dependencies = [
[[package]]
name = "keccak"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
dependencies = [
"cpufeatures",
]
@@ -4897,7 +4897,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lcli"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"account_utils",
"beacon_chain",
@@ -5383,7 +5383,7 @@ dependencies = [
[[package]]
name = "lighthouse"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"account_manager",
"account_utils",
@@ -5515,7 +5515,7 @@ dependencies = [
[[package]]
name = "lighthouse_version"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"regex",
]
@@ -6323,9 +6323,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
[[package]]
name = "num-integer"
@@ -8899,30 +8899,30 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.44"
version = "0.3.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde",
"serde_core",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.6"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.24"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3"
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
dependencies = [
"num-conv",
"time-core",
@@ -9622,7 +9622,7 @@ dependencies = [
[[package]]
name = "validator_client"
version = "8.1.0"
version = "8.1.1"
dependencies = [
"account_utils",
"beacon_node_fallback",
@@ -10607,8 +10607,7 @@ dependencies = [
[[package]]
name = "yamux"
version = "0.13.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deab71f2e20691b4728b349c6cee8fc7223880fa67b6b4f92225ec32225447e5"
source = "git+https://github.com/sigp/rust-yamux?rev=575b17c0f44f4253079a6bafaa2de74ca1d6dfaa#575b17c0f44f4253079a6bafaa2de74ca1d6dfaa"
dependencies = [
"futures",
"log",

View File

@@ -91,7 +91,7 @@ resolver = "2"
[workspace.package]
edition = "2024"
version = "8.1.0"
version = "8.1.1"
[workspace.dependencies]
account_utils = { path = "common/account_utils" }
@@ -303,3 +303,4 @@ debug = true
[patch.crates-io]
quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" }
yamux = { git = "https://github.com/sigp/rust-yamux", rev = "575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" }

View File

@@ -918,6 +918,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.start_slot(T::EthSpec::slots_per_epoch()),
);
self.observed_column_sidecars.write().prune(
new_view
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()),
);
self.observed_slashable.write().prune(
new_view
.finalized_checkpoint

View File

@@ -457,6 +457,9 @@ fn handle_error<T>(
Ok(None)
}
}
// All snappy errors from the snap crate bubble up as `Other` kind errors
// that imply invalid response
ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())),
_ => Err(RPCError::from(err)),
}
}
@@ -2317,4 +2320,43 @@ mod tests {
RPCError::InvalidData(_)
));
}
/// Test invalid snappy response.
#[test]
fn test_invalid_snappy_response() {
let spec = spec_with_all_forks_enabled();
let fork_ctx = Arc::new(fork_context(ForkName::latest(), &spec));
let max_packet_size = spec.max_payload_size as usize; // 10 MiB.
let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol.clone(),
max_packet_size,
fork_ctx.clone(),
);
let mut payload = BytesMut::new();
payload.extend_from_slice(&[0u8]);
let deneb_epoch = spec.deneb_fork_epoch.unwrap();
payload.extend_from_slice(&fork_ctx.context_bytes(deneb_epoch));
// Claim the MAXIMUM allowed size (10 MiB)
let claimed_size = max_packet_size;
let mut uvi_codec: Uvi<usize> = Uvi::default();
uvi_codec.encode(claimed_size, &mut payload).unwrap();
payload.extend_from_slice(&[0xBB; 16]); // Junk snappy.
let result = codec.decode(&mut payload);
assert!(result.is_err(), "Expected decode to fail");
// IoError = reached snappy decode (allocation happened).
let err = result.unwrap_err();
assert!(
matches!(err, RPCError::InvalidData(_)),
"Should return invalid data variant {}",
err
);
}
}

View File

@@ -13,7 +13,8 @@ use futures::prelude::*;
use libp2p::PeerId;
use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p::swarm::{ConnectionId, Stream};
use logging::crit;
@@ -888,6 +889,16 @@ where
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
error: (proto, error),
..
}) => {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto,
error,
}));
}
_ => {
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
@@ -924,7 +935,7 @@ where
request.count()
)),
}));
return self.shutdown(None);
return;
}
}
RequestType::BlobsByRange(request) => {
@@ -940,7 +951,7 @@ where
max_allowed, max_requested_blobs
)),
}));
return self.shutdown(None);
return;
}
}
_ => {}

View File

@@ -675,7 +675,7 @@ where
E: EthSpec,
{
type Output = InboundOutput<TSocket, E>;
type Error = RPCError;
type Error = (Protocol, RPCError);
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
@@ -717,10 +717,12 @@ where
)
.await
{
Err(e) => Err(RPCError::from(e)),
Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))),
Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
Ok((Some(Err(e)), _)) => Err(e),
Ok((None, _)) => Err(RPCError::IncompleteStream),
Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)),
Ok((None, _)) => {
Err((versioned_protocol.protocol(), RPCError::IncompleteStream))
}
}
}
}

View File

@@ -5,8 +5,12 @@ use crate::common::spec_with_all_forks_enabled;
use crate::common::{Protocol, build_tracing_subscriber};
use bls::Signature;
use fixed_bytes::FixedBytesExtended;
use libp2p::PeerId;
use lighthouse_network::rpc::{RequestType, methods::*};
use lighthouse_network::service::api_types::AppRequestId;
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId,
};
use lighthouse_network::{NetworkEvent, ReportSource, Response};
use ssz::Encode;
use ssz_types::{RuntimeVariableList, VariableList};
@@ -1785,3 +1789,157 @@ fn test_active_requests() {
}
})
}
// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count,
// it bans the sender.
#[test]
fn test_request_too_large_blocks_by_range() {
let spec = Arc::new(spec_with_all_forks_enabled());
test_request_too_large(
AppRequestId::Sync(SyncRequestId::BlocksByRange(BlocksByRangeRequestId {
id: 1,
parent_request_id: ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
})),
RequestType::BlocksByRange(OldBlocksByRangeRequest::new(
0,
spec.max_request_blocks(ForkName::Base) as u64 + 1, // exceeds the max request defined in the spec.
1,
)),
);
}
// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count,
// it bans the sender.
#[test]
fn test_request_too_large_blobs_by_range() {
let spec = Arc::new(spec_with_all_forks_enabled());
let max_request_blobs_count = spec.max_request_blob_sidecars(ForkName::Base) as u64
/ spec.max_blobs_per_block_within_fork(ForkName::Base);
test_request_too_large(
AppRequestId::Sync(SyncRequestId::BlobsByRange(BlobsByRangeRequestId {
id: 1,
parent_request_id: ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
})),
RequestType::BlobsByRange(BlobsByRangeRequest {
start_slot: 0,
count: max_request_blobs_count + 1, // exceeds the max request defined in the spec.
}),
);
}
// Test that when a node receives an invalid DataColumnsByRange request exceeding the columns count,
// it bans the sender.
#[test]
fn test_request_too_large_data_columns_by_range() {
test_request_too_large(
AppRequestId::Sync(SyncRequestId::DataColumnsByRange(
DataColumnsByRangeRequestId {
id: 1,
parent_request_id: DataColumnsByRangeRequester::ComponentsByRange(
ComponentsByRangeRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
},
},
),
peer: PeerId::random(),
},
)),
RequestType::DataColumnsByRange(DataColumnsByRangeRequest {
start_slot: 0,
count: 0,
// exceeds the max request defined in the spec.
columns: vec![0; E::number_of_columns() + 1],
}),
);
}
fn test_request_too_large(app_request_id: AppRequestId, request: RequestType<E>) {
// Set up the logging.
let log_level = "debug";
let enable_logging = true;
let _subscriber = build_tracing_subscriber(log_level, enable_logging);
let rt = Arc::new(Runtime::new().unwrap());
let spec = Arc::new(spec_with_all_forks_enabled());
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
Protocol::Tcp,
false,
None,
)
.await;
// Build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
debug!(?request, %peer_id, "Sending RPC request");
sender
.send_request(peer_id, app_request_id, request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
app_request_id,
response,
..
} => {
debug!(?app_request_id, ?response, "Received response");
}
NetworkEvent::RPCFailed { error, .. } => {
// This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit.
debug!(?error, "RPC failed");
unreachable!();
}
NetworkEvent::PeerDisconnected(peer_id) => {
// The receiver should disconnect as a result of the invalid request.
debug!(%peer_id, "Peer disconnected");
// End the test.
return;
}
_ => {}
}
}
}
.instrument(info_span!("Sender"));
// Build the receiver future
let receiver_future = async {
loop {
if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await {
// This event should be unreachable, as the handler drops the invalid request.
unreachable!();
}
}
}
.instrument(info_span!("Receiver"));
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
});
}

View File

@@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock<Result<Histogram>>
]),
)
});
pub static SYNCING_CHAIN_BATCHES: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"sync_batches",
"Number of batches in sync chains by sync type and state",
&["sync_type", "state"],
)
});
pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"sync_single_block_lookups",

View File

@@ -977,7 +977,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
// remove all skip slots i.e. duplicated roots
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
Ok(block_roots
.into_iter()
.unique_by(|(root, _)| *root)
.collect::<Vec<_>>())
}
/// Handle a `BlobsByRange` request from the peer.

View File

@@ -120,6 +120,39 @@ impl TestRig {
.await
}
pub async fn new_with_skip_slots(chain_length: u64, skip_slots: &HashSet<u64>) -> Self {
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
let spec = Arc::new(spec);
let beacon_processor_config = BeaconProcessorConfig::default();
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.node_custody_type(NodeCustodyType::Fullnode)
.chain_config(<_>::default())
.build();
harness.advance_slot();
for slot in 1..=chain_length {
if !skip_slots.contains(&slot) {
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
}
harness.advance_slot();
}
Self::from_harness(harness, beacon_processor_config, spec).await
}
pub async fn new_parametric(
chain_length: u64,
beacon_processor_config: BeaconProcessorConfig,
@@ -150,6 +183,14 @@ impl TestRig {
harness.advance_slot();
}
Self::from_harness(harness, beacon_processor_config, spec).await
}
async fn from_harness(
harness: BeaconChainHarness<T>,
beacon_processor_config: BeaconProcessorConfig,
spec: Arc<ChainSpec>,
) -> Self {
let head = harness.chain.head_snapshot();
assert_eq!(
@@ -1986,3 +2027,78 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
"Should have received at least some data columns"
);
}
/// Test that DataColumnsByRange does not return duplicate data columns for skip slots.
///
/// When skip slots occur, `forwards_iter_block_roots` returns the same block root for
/// consecutive slots. The deduplication in `get_block_roots_from_store` must use
/// `unique_by` on the root (not the full `(root, slot)` tuple) to avoid serving
/// duplicate data columns for the same block.
#[tokio::test]
async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
// After 4 epochs, finalized_epoch=2 (finalized_slot=64). Requesting slots 0-9
// satisfies req_start_slot + req_count <= finalized_slot (10 <= 64), which routes
// through `get_block_roots_from_store` — the code path with the bug.
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;
let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0)));
let requested_column = vec![all_custody_columns[0]];
// Request a range that spans the skip slots (slots 0 through 9).
let start_slot = 0;
let slot_count = 10;
rig.network_beacon_processor
.send_data_columns_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
DataColumnsByRangeRequest {
start_slot,
count: slot_count,
columns: requested_column.clone(),
},
)
.unwrap();
// Collect block roots from all data column responses.
let mut block_roots: Vec<Hash256> = Vec::new();
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::DataColumnsByRange(data_column),
inbound_request_id: _,
} = next
{
if let Some(column) = data_column {
block_roots.push(column.block_root());
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert!(
!block_roots.is_empty(),
"Should have received at least some data columns"
);
// Before the fix, skip slots caused the same block root to appear multiple times
// (once per skip slot) because .unique() on (Hash256, Slot) tuples didn't deduplicate.
let unique_roots: HashSet<_> = block_roots.iter().collect();
assert_eq!(
block_roots.len(),
unique_roots.len(),
"Response contained duplicate block roots: got {} columns but only {} unique roots",
block_roots.len(),
unique_roots.len(),
);
}

View File

@@ -8,9 +8,11 @@
//! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::batch::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
BatchProcessingResult, BatchState,
};
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::manager::BatchProcessResult;
@@ -31,6 +33,7 @@ use std::collections::{
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::sync::Arc;
use strum::IntoEnumIterator;
use tracing::{debug, error, info, warn};
use types::{ColumnIndex, Epoch, EthSpec};
@@ -1181,6 +1184,21 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.epoch(T::EthSpec::slots_per_epoch())
}
pub fn register_metrics(&self) {
for state in BatchMetricsState::iter() {
let count = self
.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&["backfill", state.into()],
count as i64,
);
}
}
/// Updates the global network state indicating the current state of a backfill sync.
fn set_state(&self, state: BackFillState) {
*self.network_globals.backfill_state.write() = state;

View File

@@ -10,10 +10,22 @@ use std::marker::PhantomData;
use std::ops::Sub;
use std::time::Duration;
use std::time::Instant;
use strum::Display;
use strum::{Display, EnumIter, IntoStaticStr};
use types::Slot;
use types::{DataColumnSidecarList, Epoch, EthSpec};
/// Batch states used as metrics labels.
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum BatchMetricsState {
AwaitingDownload,
Downloading,
AwaitingProcessing,
Processing,
AwaitingValidation,
Failed,
}
pub type BatchId = Epoch;
/// Type of expected batch.
@@ -142,6 +154,18 @@ impl<D: Hash> BatchState<D> {
pub fn poison(&mut self) -> BatchState<D> {
std::mem::replace(self, BatchState::Poisoned)
}
/// Returns the metrics state for this batch.
pub fn metrics_state(&self) -> BatchMetricsState {
match self {
BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload,
BatchState::Downloading(_) => BatchMetricsState::Downloading,
BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing,
BatchState::Processing(_) => BatchMetricsState::Processing,
BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation,
BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed,
}
}
}
impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {

View File

@@ -12,14 +12,16 @@ use lighthouse_network::{
};
use logging::crit;
use std::hash::{DefaultHasher, Hash, Hasher};
use strum::IntoEnumIterator;
use tracing::{debug, error, info, info_span, warn};
use types::{DataColumnSidecarList, Epoch, EthSpec};
use crate::metrics;
use crate::sync::{
backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart},
batch::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
ByRangeRequestType,
BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
BatchProcessingResult, BatchState, ByRangeRequestType,
},
block_sidecar_coupling::CouplingError,
manager::CustodyBatchProcessResult,
@@ -1114,6 +1116,21 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
*self.network_globals.custody_sync_state.write() = state;
}
pub fn register_metrics(&self) {
for state in BatchMetricsState::iter() {
let count = self
.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&["custody_backfill", state.into()],
count as i64,
);
}
}
/// A fully synced peer has joined us.
/// If we are in a failed state, update a local variable to indicate we are able to restart
/// the failed sync on the next attempt.

View File

@@ -784,6 +784,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
_ = register_metrics_interval.tick() => {
self.network.register_metrics();
self.range_sync.register_metrics();
self.backfill_sync.register_metrics();
self.custody_backfill_sync.register_metrics();
}
_ = epoch_interval.tick() => {
self.update_sync_state();

View File

@@ -3,7 +3,8 @@ use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::batch::BatchId;
use crate::sync::batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult,
BatchState,
};
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
@@ -234,6 +235,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.sum()
}
/// Returns the number of batches in the given metrics state.
pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize {
self.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count()
}
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {

View File

@@ -6,6 +6,7 @@
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::batch::BatchMetricsState;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
@@ -17,6 +18,7 @@ use smallvec::SmallVec;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use strum::IntoEnumIterator;
use tracing::{debug, error};
use types::EthSpec;
use types::{Epoch, Hash256, Slot};
@@ -351,7 +353,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.iter()
.map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id))
.collect::<Vec<_>>();
preferred_ids.sort_unstable();
// Sort in descending order
preferred_ids.sort_unstable_by(|a, b| b.cmp(a));
let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new();
for (_, _, id) in preferred_ids {
@@ -515,6 +518,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
}
}
pub fn register_metrics(&self) {
for (sync_type, chains) in [
("range_finalized", &self.finalized_chains),
("range_head", &self.head_chains),
] {
for state in BatchMetricsState::iter() {
let count: usize = chains
.values()
.map(|chain| chain.count_batches_in_state(state))
.sum();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&[sync_type, state.into()],
count as i64,
);
}
}
}
fn update_metrics(&self) {
metrics::set_gauge_vec(
&metrics::SYNCING_CHAINS_COUNT,

View File

@@ -371,6 +371,10 @@ where
.update(network, &local, &mut self.awaiting_head_peers);
}
pub fn register_metrics(&self) {
self.chains.register_metrics();
}
/// Kickstarts sync.
pub fn resume(&mut self, network: &mut SyncNetworkContext<T>) {
for (removed_chain, sync_type, remove_reason) in

View File

@@ -76,8 +76,6 @@ const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
// Generally the timeout for events should be longer than a slot.
const HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER: u32 = 50;
const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
/// A struct to define a variety of different timeouts for different validator tasks to ensure
@@ -98,7 +96,6 @@ pub struct Timeouts {
pub get_debug_beacon_states: Duration,
pub get_deposit_snapshot: Duration,
pub get_validator_block: Duration,
pub events: Duration,
pub default: Duration,
}
@@ -119,7 +116,6 @@ impl Timeouts {
get_debug_beacon_states: timeout,
get_deposit_snapshot: timeout,
get_validator_block: timeout,
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * timeout,
default: timeout,
}
}
@@ -142,7 +138,6 @@ impl Timeouts {
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * base_timeout,
default: base_timeout / HTTP_DEFAULT_TIMEOUT_QUOTIENT,
}
}
@@ -2805,10 +2800,14 @@ impl BeaconNodeHttpClient {
.join(",");
path.query_pairs_mut().append_pair("topics", &topic_string);
// Do not use a timeout for the events endpoint. Using a regular timeout will trigger a
// timeout every `timeout` seconds, regardless of any data streamed from the endpoint.
// In future we could add a read_timeout, but that can only be configured globally on the
// Client.
let mut es = self
.client
.get(path)
.timeout(self.timeouts.events)
.timeout(Duration::MAX)
.eventsource()
.map_err(Error::SseEventSource)?;
// If we don't await `Event::Open` here, then the consumer