mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 21:08:32 +00:00
Realized unrealized experimentation (#3322)
## Issue Addressed Add a flag that optionally enables unrealized vote tracking. Would like to test out on testnets and benchmark differences in methods of vote tracking. This PR includes a DB schema upgrade to enable to new vote tracking style. Co-authored-by: realbigsean <sean@sigmaprime.io> Co-authored-by: Paul Hauner <paul@paulhauner.com> Co-authored-by: sean <seananderson33@gmail.com> Co-authored-by: Mac L <mjladson@pm.me>
This commit is contained in:
@@ -6,7 +6,8 @@ use beacon_chain::{
|
||||
observed_operations::ObservationOutcome,
|
||||
sync_committee_verification::{self, Error as SyncCommitteeError},
|
||||
validator_monitor::get_block_delay_ms,
|
||||
BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock,
|
||||
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
|
||||
GossipVerifiedBlock,
|
||||
};
|
||||
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
|
||||
use slog::{crit, debug, error, info, trace, warn};
|
||||
@@ -899,7 +900,11 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
) {
|
||||
let block: Arc<_> = verified_block.block.clone();
|
||||
|
||||
match self.chain.process_block(verified_block).await {
|
||||
match self
|
||||
.chain
|
||||
.process_block(verified_block, CountUnrealized::True)
|
||||
.await
|
||||
{
|
||||
Ok(block_root) => {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@ use crate::beacon_processor::DuplicateCache;
|
||||
use crate::metrics;
|
||||
use crate::sync::manager::{BlockProcessType, SyncMessage};
|
||||
use crate::sync::{BatchProcessResult, ChainId};
|
||||
use beacon_chain::ExecutionPayloadError;
|
||||
use beacon_chain::{
|
||||
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
|
||||
};
|
||||
use beacon_chain::{CountUnrealized, ExecutionPayloadError};
|
||||
use lighthouse_network::PeerAction;
|
||||
use slog::{debug, error, info, warn};
|
||||
use std::sync::Arc;
|
||||
@@ -21,7 +21,7 @@ use types::{Epoch, Hash256, SignedBeaconBlock};
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ChainSegmentProcessId {
|
||||
/// Processing Id of a range syncing batch.
|
||||
RangeBatchId(ChainId, Epoch),
|
||||
RangeBatchId(ChainId, Epoch, CountUnrealized),
|
||||
/// Processing ID for a backfill syncing batch.
|
||||
BackSyncBatchId(Epoch),
|
||||
/// Processing Id of the parent lookup of a block.
|
||||
@@ -89,7 +89,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
};
|
||||
let slot = block.slot();
|
||||
let result = self.chain.process_block(block).await;
|
||||
let result = self.chain.process_block(block, CountUnrealized::True).await;
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
@@ -133,12 +133,15 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
) {
|
||||
let result = match sync_type {
|
||||
// this a request from the range sync
|
||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
|
||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, count_unrealized) => {
|
||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||
let sent_blocks = downloaded_blocks.len();
|
||||
|
||||
match self.process_blocks(downloaded_blocks.iter()).await {
|
||||
match self
|
||||
.process_blocks(downloaded_blocks.iter(), count_unrealized)
|
||||
.await
|
||||
{
|
||||
(_, Ok(_)) => {
|
||||
debug!(self.log, "Batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
@@ -207,7 +210,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
);
|
||||
// parent blocks are ordered from highest slot to lowest, so we need to process in
|
||||
// reverse
|
||||
match self.process_blocks(downloaded_blocks.iter().rev()).await {
|
||||
match self
|
||||
.process_blocks(downloaded_blocks.iter().rev(), CountUnrealized::True)
|
||||
.await
|
||||
{
|
||||
(imported_blocks, Err(e)) => {
|
||||
debug!(self.log, "Parent lookup failed"; "error" => %e.message);
|
||||
BatchProcessResult::Failed {
|
||||
@@ -231,9 +237,14 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
async fn process_blocks<'a>(
|
||||
&self,
|
||||
downloaded_blocks: impl Iterator<Item = &'a Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> (usize, Result<(), ChainSegmentFailed>) {
|
||||
let blocks: Vec<Arc<_>> = downloaded_blocks.cloned().collect();
|
||||
match self.chain.process_chain_segment(blocks).await {
|
||||
match self
|
||||
.chain
|
||||
.process_chain_segment(blocks, count_unrealized)
|
||||
.await
|
||||
{
|
||||
ChainSegmentResult::Successful { imported_blocks } => {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
|
||||
if imported_blocks > 0 {
|
||||
|
||||
@@ -532,7 +532,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
.parent_block_processed(chain_hash, result, &mut self.network),
|
||||
},
|
||||
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
|
||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
|
||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => {
|
||||
self.range_sync.handle_block_process_result(
|
||||
&mut self.network,
|
||||
chain_id,
|
||||
|
||||
@@ -2,7 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
|
||||
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, FailureMode};
|
||||
use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_chain::{BeaconChainTypes, CountUnrealized};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
use rand::seq::SliceRandom;
|
||||
@@ -100,6 +100,8 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||
beacon_processor_send: Sender<BeaconWorkEvent<T>>,
|
||||
|
||||
is_finalized_segment: bool,
|
||||
|
||||
/// The chain's log.
|
||||
log: slog::Logger,
|
||||
}
|
||||
@@ -126,6 +128,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
target_head_root: Hash256,
|
||||
peer_id: PeerId,
|
||||
beacon_processor_send: Sender<BeaconWorkEvent<T>>,
|
||||
is_finalized_segment: bool,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let mut peers = FnvHashMap::default();
|
||||
@@ -148,6 +151,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
current_processing_batch: None,
|
||||
validated_batches: 0,
|
||||
beacon_processor_send,
|
||||
is_finalized_segment,
|
||||
log: log.new(o!("chain" => id)),
|
||||
}
|
||||
}
|
||||
@@ -302,7 +306,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
// for removing chains and checking completion is in the callback.
|
||||
|
||||
let blocks = batch.start_processing()?;
|
||||
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
|
||||
let count_unrealized = if self.is_finalized_segment {
|
||||
CountUnrealized::False
|
||||
} else {
|
||||
CountUnrealized::True
|
||||
};
|
||||
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
|
||||
self.current_processing_batch = Some(batch_id);
|
||||
|
||||
if let Err(e) = self
|
||||
|
||||
@@ -472,10 +472,10 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
||||
let collection = if let RangeSyncType::Finalized = sync_type {
|
||||
&mut self.finalized_chains
|
||||
let (collection, is_finalized) = if let RangeSyncType::Finalized = sync_type {
|
||||
(&mut self.finalized_chains, true)
|
||||
} else {
|
||||
&mut self.head_chains
|
||||
(&mut self.head_chains, false)
|
||||
};
|
||||
match collection.entry(id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
@@ -501,6 +501,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
target_head_root,
|
||||
peer,
|
||||
beacon_processor_send.clone(),
|
||||
is_finalized,
|
||||
&self.log,
|
||||
);
|
||||
debug_assert_eq!(new_chain.get_id(), id);
|
||||
|
||||
Reference in New Issue
Block a user