mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-18 21:38:31 +00:00
Add range sync metrics to track efficiency (#6095)
* Add more range sync metrics to track efficiency * Add ignored blocks metrics
This commit is contained in:
committed by
Age Manning
parent
cb1e8dc3f9
commit
4bfca8251d
@@ -237,6 +237,36 @@ lazy_static! {
|
|||||||
"Number of Syncing chains in range, per range type",
|
"Number of Syncing chains in range, per range type",
|
||||||
&["range_type"]
|
&["range_type"]
|
||||||
);
|
);
|
||||||
|
pub static ref SYNCING_CHAINS_REMOVED: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||||
|
"sync_range_removed_chains_total",
|
||||||
|
"Total count of range syncing chains removed per range type",
|
||||||
|
&["range_type"]
|
||||||
|
);
|
||||||
|
pub static ref SYNCING_CHAINS_ADDED: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||||
|
"sync_range_added_chains_total",
|
||||||
|
"Total count of range syncing chains added per range type",
|
||||||
|
&["range_type"]
|
||||||
|
);
|
||||||
|
pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||||
|
"sync_range_chains_dropped_blocks_total",
|
||||||
|
"Total count of dropped blocks when removing a syncing chain per range type",
|
||||||
|
&["range_type"]
|
||||||
|
);
|
||||||
|
pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||||
|
"sync_range_chains_ignored_blocks_total",
|
||||||
|
"Total count of ignored blocks when processing a syncing chain batch per chain type",
|
||||||
|
&["chain_type"]
|
||||||
|
);
|
||||||
|
pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||||
|
"sync_range_chains_processed_batches_total",
|
||||||
|
"Total count of processed batches in a syncing chain batch per chain type",
|
||||||
|
&["chain_type"]
|
||||||
|
);
|
||||||
|
pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
|
||||||
|
"sync_range_chain_batch_awaiting_processing_seconds",
|
||||||
|
"Time range sync batches spend in AwaitingProcessing state",
|
||||||
|
Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0])
|
||||||
|
);
|
||||||
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
|
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
|
||||||
"sync_single_block_lookups",
|
"sync_single_block_lookups",
|
||||||
"Number of single block lookups underway"
|
"Number of single block lookups underway"
|
||||||
|
|||||||
@@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
|
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
(_, Ok(_)) => {
|
(imported_blocks, Ok(_)) => {
|
||||||
debug!(self.log, "Batch processed";
|
debug!(self.log, "Batch processed";
|
||||||
"batch_epoch" => epoch,
|
"batch_epoch" => epoch,
|
||||||
"first_block_slot" => start_slot,
|
"first_block_slot" => start_slot,
|
||||||
@@ -335,7 +335,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"processed_blocks" => sent_blocks,
|
"processed_blocks" => sent_blocks,
|
||||||
"service"=> "sync");
|
"service"=> "sync");
|
||||||
BatchProcessResult::Success {
|
BatchProcessResult::Success {
|
||||||
was_non_empty: sent_blocks > 0,
|
sent_blocks,
|
||||||
|
imported_blocks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(imported_blocks, Err(e)) => {
|
(imported_blocks, Err(e)) => {
|
||||||
@@ -349,7 +350,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"service" => "sync");
|
"service" => "sync");
|
||||||
match e.peer_action {
|
match e.peer_action {
|
||||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||||
imported_blocks: imported_blocks > 0,
|
imported_blocks,
|
||||||
penalty,
|
penalty,
|
||||||
},
|
},
|
||||||
None => BatchProcessResult::NonFaultyFailure,
|
None => BatchProcessResult::NonFaultyFailure,
|
||||||
@@ -368,7 +369,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
.sum::<usize>();
|
.sum::<usize>();
|
||||||
|
|
||||||
match self.process_backfill_blocks(downloaded_blocks) {
|
match self.process_backfill_blocks(downloaded_blocks) {
|
||||||
(_, Ok(_)) => {
|
(imported_blocks, Ok(_)) => {
|
||||||
debug!(self.log, "Backfill batch processed";
|
debug!(self.log, "Backfill batch processed";
|
||||||
"batch_epoch" => epoch,
|
"batch_epoch" => epoch,
|
||||||
"first_block_slot" => start_slot,
|
"first_block_slot" => start_slot,
|
||||||
@@ -377,7 +378,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"processed_blobs" => n_blobs,
|
"processed_blobs" => n_blobs,
|
||||||
"service"=> "sync");
|
"service"=> "sync");
|
||||||
BatchProcessResult::Success {
|
BatchProcessResult::Success {
|
||||||
was_non_empty: sent_blocks > 0,
|
sent_blocks,
|
||||||
|
imported_blocks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(_, Err(e)) => {
|
(_, Err(e)) => {
|
||||||
@@ -390,7 +392,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"service" => "sync");
|
"service" => "sync");
|
||||||
match e.peer_action {
|
match e.peer_action {
|
||||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||||
imported_blocks: false,
|
imported_blocks: 0,
|
||||||
penalty,
|
penalty,
|
||||||
},
|
},
|
||||||
None => BatchProcessResult::NonFaultyFailure,
|
None => BatchProcessResult::NonFaultyFailure,
|
||||||
|
|||||||
@@ -528,7 +528,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
// result callback. This is done, because an empty batch could end a chain and the logic
|
// result callback. This is done, because an empty batch could end a chain and the logic
|
||||||
// for removing chains and checking completion is in the callback.
|
// for removing chains and checking completion is in the callback.
|
||||||
|
|
||||||
let blocks = match batch.start_processing() {
|
let (blocks, _) = match batch.start_processing() {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return self
|
return self
|
||||||
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
||||||
@@ -615,13 +615,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
|
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
BatchProcessResult::Success { was_non_empty } => {
|
BatchProcessResult::Success {
|
||||||
|
imported_blocks, ..
|
||||||
|
} => {
|
||||||
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
|
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
|
||||||
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
||||||
}
|
}
|
||||||
// If the processed batch was not empty, we can validate previous unvalidated
|
// If the processed batch was not empty, we can validate previous unvalidated
|
||||||
// blocks.
|
// blocks.
|
||||||
if *was_non_empty {
|
if *imported_blocks > 0 {
|
||||||
self.advance_chain(network, batch_id);
|
self.advance_chain(network, batch_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -677,7 +679,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
|
|
||||||
Ok(BatchOperationOutcome::Continue) => {
|
Ok(BatchOperationOutcome::Continue) => {
|
||||||
// chain can continue. Check if it can be progressed
|
// chain can continue. Check if it can be progressed
|
||||||
if *imported_blocks {
|
if *imported_blocks > 0 {
|
||||||
// At least one block was successfully verified and imported, then we can be sure all
|
// At least one block was successfully verified and imported, then we can be sure all
|
||||||
// previous batches are valid and we only need to download the current failed
|
// previous batches are valid and we only need to download the current failed
|
||||||
// batch.
|
// batch.
|
||||||
|
|||||||
@@ -156,11 +156,12 @@ pub enum BlockProcessingResult<E: EthSpec> {
|
|||||||
pub enum BatchProcessResult {
|
pub enum BatchProcessResult {
|
||||||
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
|
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
|
||||||
Success {
|
Success {
|
||||||
was_non_empty: bool,
|
sent_blocks: usize,
|
||||||
|
imported_blocks: usize,
|
||||||
},
|
},
|
||||||
/// The batch processing failed. It carries whether the processing imported any block.
|
/// The batch processing failed. It carries whether the processing imported any block.
|
||||||
FaultyFailure {
|
FaultyFailure {
|
||||||
imported_blocks: bool,
|
imported_blocks: usize,
|
||||||
penalty: PeerAction,
|
penalty: PeerAction,
|
||||||
},
|
},
|
||||||
NonFaultyFailure,
|
NonFaultyFailure,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use lighthouse_network::PeerId;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
use strum::Display;
|
use strum::Display;
|
||||||
use types::{Epoch, EthSpec, Slot};
|
use types::{Epoch, EthSpec, Slot};
|
||||||
|
|
||||||
@@ -118,7 +119,7 @@ pub enum BatchState<E: EthSpec> {
|
|||||||
/// The batch is being downloaded.
|
/// The batch is being downloaded.
|
||||||
Downloading(PeerId, Id),
|
Downloading(PeerId, Id),
|
||||||
/// The batch has been completely downloaded and is ready for processing.
|
/// The batch has been completely downloaded and is ready for processing.
|
||||||
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
|
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
|
||||||
/// The batch is being processed.
|
/// The batch is being processed.
|
||||||
Processing(Attempt),
|
Processing(Attempt),
|
||||||
/// The batch was successfully processed and is waiting to be validated.
|
/// The batch was successfully processed and is waiting to be validated.
|
||||||
@@ -210,13 +211,26 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
|||||||
match &self.state {
|
match &self.state {
|
||||||
BatchState::AwaitingDownload | BatchState::Failed => None,
|
BatchState::AwaitingDownload | BatchState::Failed => None,
|
||||||
BatchState::Downloading(peer_id, _)
|
BatchState::Downloading(peer_id, _)
|
||||||
| BatchState::AwaitingProcessing(peer_id, _)
|
| BatchState::AwaitingProcessing(peer_id, _, _)
|
||||||
| BatchState::Processing(Attempt { peer_id, .. })
|
| BatchState::Processing(Attempt { peer_id, .. })
|
||||||
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
|
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
|
||||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the count of stored pending blocks if in awaiting processing state
|
||||||
|
pub fn pending_blocks(&self) -> usize {
|
||||||
|
match &self.state {
|
||||||
|
BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(),
|
||||||
|
BatchState::AwaitingDownload
|
||||||
|
| BatchState::Downloading { .. }
|
||||||
|
| BatchState::Processing { .. }
|
||||||
|
| BatchState::AwaitingValidation { .. }
|
||||||
|
| BatchState::Poisoned
|
||||||
|
| BatchState::Failed => 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a BlocksByRange request associated with the batch.
|
/// Returns a BlocksByRange request associated with the batch.
|
||||||
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
|
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
|
||||||
(
|
(
|
||||||
@@ -293,7 +307,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let received = blocks.len();
|
let received = blocks.len();
|
||||||
self.state = BatchState::AwaitingProcessing(peer, blocks);
|
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
|
||||||
Ok(received)
|
Ok(received)
|
||||||
}
|
}
|
||||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||||
@@ -365,11 +379,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<E>>, WrongState> {
|
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
|
||||||
match self.state.poison() {
|
match self.state.poison() {
|
||||||
BatchState::AwaitingProcessing(peer, blocks) => {
|
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
|
||||||
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
|
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
|
||||||
Ok(blocks)
|
Ok((blocks, start_instant.elapsed()))
|
||||||
}
|
}
|
||||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||||
other => {
|
other => {
|
||||||
@@ -515,7 +529,7 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
|
|||||||
}) => write!(f, "AwaitingValidation({})", peer_id),
|
}) => write!(f, "AwaitingValidation({})", peer_id),
|
||||||
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
|
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
|
||||||
BatchState::Failed => f.write_str("Failed"),
|
BatchState::Failed => f.write_str("Failed"),
|
||||||
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
|
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
|
||||||
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
|
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
|
||||||
}
|
}
|
||||||
BatchState::Downloading(peer, request_id) => {
|
BatchState::Downloading(peer, request_id) => {
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
|
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
|
||||||
|
use super::RangeSyncType;
|
||||||
|
use crate::metrics;
|
||||||
use crate::network_beacon_processor::ChainSegmentProcessId;
|
use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||||
use crate::sync::network_context::RangeRequestId;
|
use crate::sync::network_context::RangeRequestId;
|
||||||
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
|
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
|
||||||
@@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
|
|||||||
use slog::{crit, debug, o, warn};
|
use slog::{crit, debug, o, warn};
|
||||||
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
|
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
|
use strum::IntoStaticStr;
|
||||||
use types::{Epoch, EthSpec, Hash256, Slot};
|
use types::{Epoch, EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
|
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
|
||||||
@@ -53,6 +56,13 @@ pub struct KeepChain;
|
|||||||
pub type ChainId = u64;
|
pub type ChainId = u64;
|
||||||
pub type BatchId = Epoch;
|
pub type BatchId = Epoch;
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone, IntoStaticStr)]
|
||||||
|
pub enum SyncingChainType {
|
||||||
|
Head,
|
||||||
|
Finalized,
|
||||||
|
Backfill,
|
||||||
|
}
|
||||||
|
|
||||||
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
|
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
|
||||||
/// root are grouped into the peer pool and queried for batches when downloading the
|
/// root are grouped into the peer pool and queried for batches when downloading the
|
||||||
/// chain.
|
/// chain.
|
||||||
@@ -60,6 +70,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
|||||||
/// A random id used to identify this chain.
|
/// A random id used to identify this chain.
|
||||||
id: ChainId,
|
id: ChainId,
|
||||||
|
|
||||||
|
/// SyncingChain type
|
||||||
|
pub chain_type: SyncingChainType,
|
||||||
|
|
||||||
/// The start of the chain segment. Any epoch previous to this one has been validated.
|
/// The start of the chain segment. Any epoch previous to this one has been validated.
|
||||||
pub start_epoch: Epoch,
|
pub start_epoch: Epoch,
|
||||||
|
|
||||||
@@ -126,6 +139,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
target_head_slot: Slot,
|
target_head_slot: Slot,
|
||||||
target_head_root: Hash256,
|
target_head_root: Hash256,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
|
chain_type: SyncingChainType,
|
||||||
log: &slog::Logger,
|
log: &slog::Logger,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut peers = FnvHashMap::default();
|
let mut peers = FnvHashMap::default();
|
||||||
@@ -135,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
|
|
||||||
SyncingChain {
|
SyncingChain {
|
||||||
id,
|
id,
|
||||||
|
chain_type,
|
||||||
start_epoch,
|
start_epoch,
|
||||||
target_head_slot,
|
target_head_slot,
|
||||||
target_head_root,
|
target_head_root,
|
||||||
@@ -171,6 +186,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
self.validated_batches * EPOCHS_PER_BATCH
|
self.validated_batches * EPOCHS_PER_BATCH
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the total count of pending blocks in all the batches of this chain
|
||||||
|
pub fn pending_blocks(&self) -> usize {
|
||||||
|
self.batches
|
||||||
|
.values()
|
||||||
|
.map(|batch| batch.pending_blocks())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes a peer from the chain.
|
/// Removes a peer from the chain.
|
||||||
/// If the peer has active batches, those are considered failed and re-requested.
|
/// If the peer has active batches, those are considered failed and re-requested.
|
||||||
pub fn remove_peer(
|
pub fn remove_peer(
|
||||||
@@ -305,7 +328,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
// result callback. This is done, because an empty batch could end a chain and the logic
|
// result callback. This is done, because an empty batch could end a chain and the logic
|
||||||
// for removing chains and checking completion is in the callback.
|
// for removing chains and checking completion is in the callback.
|
||||||
|
|
||||||
let blocks = batch.start_processing()?;
|
let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
|
||||||
|
metrics::observe_duration(
|
||||||
|
&metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
|
||||||
|
duration_in_awaiting_processing,
|
||||||
|
);
|
||||||
|
|
||||||
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
|
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
|
||||||
self.current_processing_batch = Some(batch_id);
|
self.current_processing_batch = Some(batch_id);
|
||||||
|
|
||||||
@@ -469,10 +497,27 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
// We consider three cases. Batch was successfully processed, Batch failed processing due
|
// We consider three cases. Batch was successfully processed, Batch failed processing due
|
||||||
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
|
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
|
||||||
match result {
|
match result {
|
||||||
BatchProcessResult::Success { was_non_empty } => {
|
BatchProcessResult::Success {
|
||||||
|
sent_blocks,
|
||||||
|
imported_blocks,
|
||||||
|
} => {
|
||||||
|
if sent_blocks > imported_blocks {
|
||||||
|
let ignored_blocks = sent_blocks - imported_blocks;
|
||||||
|
metrics::inc_counter_vec_by(
|
||||||
|
&metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
|
||||||
|
&[self.chain_type.into()],
|
||||||
|
ignored_blocks as u64,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
metrics::inc_counter_vec(
|
||||||
|
&metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
|
||||||
|
&[self.chain_type.into()],
|
||||||
|
);
|
||||||
|
|
||||||
batch.processing_completed(BatchProcessingResult::Success)?;
|
batch.processing_completed(BatchProcessingResult::Success)?;
|
||||||
|
|
||||||
if *was_non_empty {
|
// was not empty = sent_blocks > 0
|
||||||
|
if *sent_blocks > 0 {
|
||||||
// If the processed batch was not empty, we can validate previous unvalidated
|
// If the processed batch was not empty, we can validate previous unvalidated
|
||||||
// blocks.
|
// blocks.
|
||||||
self.advance_chain(network, batch_id);
|
self.advance_chain(network, batch_id);
|
||||||
@@ -515,7 +560,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
|
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
|
||||||
BatchOperationOutcome::Continue => {
|
BatchOperationOutcome::Continue => {
|
||||||
// Chain can continue. Check if it can be moved forward.
|
// Chain can continue. Check if it can be moved forward.
|
||||||
if *imported_blocks {
|
if *imported_blocks > 0 {
|
||||||
// At least one block was successfully verified and imported, so we can be sure all
|
// At least one block was successfully verified and imported, so we can be sure all
|
||||||
// previous batches are valid and we only need to download the current failed
|
// previous batches are valid and we only need to download the current failed
|
||||||
// batch.
|
// batch.
|
||||||
@@ -1142,3 +1187,12 @@ impl RemoveChain {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<RangeSyncType> for SyncingChainType {
|
||||||
|
fn from(value: RangeSyncType) -> Self {
|
||||||
|
match value {
|
||||||
|
RangeSyncType::Head => Self::Head,
|
||||||
|
RangeSyncType::Finalized => Self::Finalized,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -64,8 +64,8 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
|||||||
|
|
||||||
/// Updates the Syncing state of the collection after a chain is removed.
|
/// Updates the Syncing state of the collection after a chain is removed.
|
||||||
fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) {
|
fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) {
|
||||||
let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
|
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_REMOVED, &[sync_type.as_str()]);
|
||||||
.map(|m| m.dec());
|
self.update_metrics();
|
||||||
|
|
||||||
match self.state {
|
match self.state {
|
||||||
RangeSyncState::Finalized(ref syncing_id) => {
|
RangeSyncState::Finalized(ref syncing_id) => {
|
||||||
@@ -493,15 +493,28 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
|||||||
target_head_slot,
|
target_head_slot,
|
||||||
target_head_root,
|
target_head_root,
|
||||||
peer,
|
peer,
|
||||||
|
sync_type.into(),
|
||||||
&self.log,
|
&self.log,
|
||||||
);
|
);
|
||||||
debug_assert_eq!(new_chain.get_id(), id);
|
debug_assert_eq!(new_chain.get_id(), id);
|
||||||
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
|
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
|
||||||
entry.insert(new_chain);
|
entry.insert(new_chain);
|
||||||
let _ =
|
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]);
|
||||||
metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
|
self.update_metrics();
|
||||||
.map(|m| m.inc());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_metrics(&self) {
|
||||||
|
metrics::set_gauge_vec(
|
||||||
|
&metrics::SYNCING_CHAINS_COUNT,
|
||||||
|
&[RangeSyncType::Finalized.as_str()],
|
||||||
|
self.finalized_chains.len() as i64,
|
||||||
|
);
|
||||||
|
metrics::set_gauge_vec(
|
||||||
|
&metrics::SYNCING_CHAINS_COUNT,
|
||||||
|
&[RangeSyncType::Head.as_str()],
|
||||||
|
self.head_chains.len() as i64,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ use super::block_storage::BlockStorage;
|
|||||||
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
|
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
|
||||||
use super::chain_collection::ChainCollection;
|
use super::chain_collection::ChainCollection;
|
||||||
use super::sync_type::RangeSyncType;
|
use super::sync_type::RangeSyncType;
|
||||||
|
use crate::metrics;
|
||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
use crate::sync::network_context::SyncNetworkContext;
|
use crate::sync::network_context::SyncNetworkContext;
|
||||||
use crate::sync::BatchProcessResult;
|
use crate::sync::BatchProcessResult;
|
||||||
@@ -346,6 +347,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::inc_counter_vec_by(
|
||||||
|
&metrics::SYNCING_CHAINS_DROPPED_BLOCKS,
|
||||||
|
&[sync_type.as_str()],
|
||||||
|
chain.pending_blocks() as u64,
|
||||||
|
);
|
||||||
|
|
||||||
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
|
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
|
||||||
|
|
||||||
let status = self.beacon_chain.status_message();
|
let status = self.beacon_chain.status_message();
|
||||||
|
|||||||
Reference in New Issue
Block a user