diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 283daa4279..c3a1ed55ec 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2978,7 +2978,7 @@ impl BeaconChain { metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES); let block_delay = self .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .ok_or(Error::UnableToComputeTimeAtSlot)?; fork_choice @@ -3855,7 +3855,7 @@ impl BeaconChain { let slot_delay = self .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .or_else(|| { warn!( self.log, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 1a53942562..c72c3d2cd4 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -68,6 +68,8 @@ pub struct ChainConfig { /// /// This is useful for block builders and testing. pub always_prepare_payload: bool, + /// Whether backfill sync processing should be rate-limited. + pub enable_backfill_rate_limiting: bool, } impl Default for ChainConfig { @@ -94,6 +96,7 @@ impl Default for ChainConfig { optimistic_finalized_sync: true, shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, always_prepare_payload: false, + enable_backfill_rate_limiting: true, } } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e80b6fd18c..329f072754 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -355,12 +355,6 @@ where while block.slot() % slots_per_epoch != 0 { block_slot = (block_slot / slots_per_epoch - 1) * slots_per_epoch; - debug!( - context.log(), - "Searching for aligned checkpoint block"; - "block_slot" => block_slot, - ); - debug!( context.log(), "Searching for aligned checkpoint block"; diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 70036945e0..8052d2a4fb 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -290,11 +290,20 @@ impl PeerManager { // If a peer is being banned, this trumps any temporary ban the peer might be // under. We no longer track it in the temporary ban list. - self.temporary_banned_peers.raw_remove(peer_id); - - // Inform the Swarm to ban the peer - self.events - .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); + if !self.temporary_banned_peers.raw_remove(peer_id) { + // If the peer is not already banned, inform the Swarm to ban the peer + self.events + .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); + // If the peer was in the process of being un-banned, remove it (a rare race + // condition) + self.events.retain(|event| { + if let PeerManagerEvent::UnBanned(unbanned_peer_id, _) = event { + unbanned_peer_id != peer_id // Remove matching peer ids + } else { + true + } + }); + } } } } @@ -562,8 +571,8 @@ impl PeerManager { Protocol::BlobsByRoot => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, - Protocol::MetaData => PeerAction::LowToleranceError, - Protocol::Status => PeerAction::LowToleranceError, + Protocol::MetaData => PeerAction::Fatal, + Protocol::Status => PeerAction::Fatal, } } RPCError::StreamTimeout => match direction { diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index a29f243c9e..24de83a61d 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -156,8 +156,10 @@ impl PeerManager { BanResult::BadScore => { // This is a faulty state error!(self.log, "Connected to a banned peer. Re-banning"; "peer_id" => %peer_id); - // Reban the peer + // Disconnect the peer. self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager); + // Re-ban the peer to prevent repeated errors. + self.events.push(PeerManagerEvent::Banned(peer_id, vec![])); return; } BanResult::BannedIp(ip_addr) => { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index ac8251ea6e..99b94b15bf 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1127,7 +1127,7 @@ impl Network { debug!(self.log, "Peer does not support gossipsub"; "peer_id" => %peer_id); self.peer_manager_mut().report_peer( &peer_id, - PeerAction::LowToleranceError, + PeerAction::Fatal, ReportSource::Gossipsub, Some(GoodbyeReason::Unknown), "does_not_support_gossipsub", diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 73535dc83c..cf66265f30 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -63,6 +63,7 @@ use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use types::{ Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, @@ -80,7 +81,9 @@ mod tests; mod work_reprocessing_queue; mod worker; -use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; +use crate::beacon_processor::work_reprocessing_queue::{ + QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, +}; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; /// The maximum size of the channel for work events to the `BeaconProcessor`. @@ -230,6 +233,7 @@ pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_upd pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const RPC_BLOCK: &str = "rpc_block"; pub const CHAIN_SEGMENT: &str = "chain_segment"; +pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; @@ -802,6 +806,9 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::BackfillSync(QueuedBackfillBatch { process_id, blocks }) => { + WorkEvent::chain_segment(process_id, blocks) + } } } } @@ -975,6 +982,10 @@ impl Work { Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, + Work::ChainSegment { + process_id: ChainSegmentProcessId::BackSyncBatchId { .. }, + .. + } => CHAIN_SEGMENT_BACKFILL, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, @@ -1142,23 +1153,23 @@ impl BeaconProcessor { FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN); let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); + + let chain = match self.beacon_chain.upgrade() { + Some(chain) => chain, + // No need to proceed any further if the beacon chain has been dropped, the client + // is shutting down. + None => return, + }; + // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); - let work_reprocessing_tx = { - if let Some(chain) = self.beacon_chain.upgrade() { - spawn_reprocess_scheduler( - ready_work_tx, - &self.executor, - chain.slot_clock.clone(), - self.log.clone(), - ) - } else { - // No need to proceed any further if the beacon chain has been dropped, the client - // is shutting down. - return; - } - }; + let work_reprocessing_tx = spawn_reprocess_scheduler( + ready_work_tx, + &self.executor, + chain.slot_clock.clone(), + self.log.clone(), + ); let executor = self.executor.clone(); @@ -1171,12 +1182,55 @@ impl BeaconProcessor { reprocess_work_rx: ready_work_rx, }; + let enable_backfill_rate_limiting = chain.config.enable_backfill_rate_limiting; + loop { let work_event = match inbound_events.next().await { Some(InboundEvent::WorkerIdle) => { self.current_workers = self.current_workers.saturating_sub(1); None } + Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { + match QueuedBackfillBatch::try_from(event) { + Ok(backfill_batch) => { + match work_reprocessing_tx + .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) + { + Err(e) => { + warn!( + self.log, + "Unable to queue backfill work event. Will try to process now."; + "error" => %e + ); + match e { + TrySendError::Full(reprocess_queue_message) + | TrySendError::Closed(reprocess_queue_message) => { + match reprocess_queue_message { + ReprocessQueueMessage::BackfillSync( + backfill_batch, + ) => Some(backfill_batch.into()), + other => { + crit!( + self.log, + "Unexpected queue message type"; + "message_type" => other.as_ref() + ); + // This is an unhandled exception, drop the message. + continue; + } + } + } + } + } + Ok(..) => { + // backfill work sent to "reprocessing" queue. Process the next event. + continue; + } + } + } + Err(event) => Some(event), + } + } Some(InboundEvent::WorkEvent(event)) | Some(InboundEvent::ReprocessingWork(event)) => Some(event), None => { diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index cf4934a668..00ca60ba99 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -9,7 +9,7 @@ use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; -use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, rpc::methods::{MetaData, MetaDataV2}, @@ -23,8 +23,8 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, - SignedVoluntaryExit, SubnetId, + Attestation, AttesterSlashing, Epoch, EthSpec, MainnetEthSpec, ProposerSlashing, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; type E = MainnetEthSpec; @@ -70,6 +70,10 @@ impl Drop for TestRig { impl TestRig { pub async fn new(chain_length: u64) -> Self { + Self::new_with_chain_config(chain_length, ChainConfig::default()).await + } + + pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = E::default_spec(); spec.shard_committee_period = 2; @@ -78,6 +82,7 @@ impl TestRig { .spec(spec) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() + .chain_config(chain_config) .build(); harness.advance_slot(); @@ -261,6 +266,14 @@ impl TestRig { self.beacon_processor_tx.try_send(event).unwrap(); } + pub fn enqueue_backfill_batch(&self) { + let event = WorkEvent::chain_segment( + ChainSegmentProcessId::BackSyncBatchId(Epoch::default()), + Vec::default(), + ); + self.beacon_processor_tx.try_send(event).unwrap(); + } + pub fn enqueue_unaggregated_attestation(&self) { let (attestation, subnet_id) = self.attestations.first().unwrap().clone(); self.beacon_processor_tx @@ -873,3 +886,49 @@ async fn test_rpc_block_reprocessing() { // cache handle was dropped. assert_eq!(next_block_root, rig.head_root()); } + +/// Ensure that backfill batches get rate-limited and processing is scheduled at specified intervals. +#[tokio::test] +async fn test_backfill_sync_processing() { + let mut rig = TestRig::new(SMALL_CHAIN).await; + // Note: to verify the exact event times in an integration test is not straight forward here + // (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code) + // and makes the test very slow, hence timing calculation is unit tested separately in + // `work_reprocessing_queue`. + for _ in 0..1 { + rig.enqueue_backfill_batch(); + // ensure queued batch is not processed until later + rig.assert_no_events_for(Duration::from_millis(100)).await; + // A new batch should be processed within a slot. + rig.assert_event_journal_with_timeout( + &[CHAIN_SEGMENT_BACKFILL, WORKER_FREED, NOTHING_TO_DO], + rig.chain.slot_clock.slot_duration(), + ) + .await; + } +} + +/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. +#[tokio::test] +async fn test_backfill_sync_processing_rate_limiting_disabled() { + let chain_config = ChainConfig { + enable_backfill_rate_limiting: false, + ..Default::default() + }; + let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await; + + for _ in 0..3 { + rig.enqueue_backfill_batch(); + } + + // ensure all batches are processed + rig.assert_event_journal_with_timeout( + &[ + CHAIN_SEGMENT_BACKFILL, + CHAIN_SEGMENT_BACKFILL, + CHAIN_SEGMENT_BACKFILL, + ], + Duration::from_millis(100), + ) + .await; +} diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 0e4a08f5d6..969e1eea3d 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -11,6 +11,7 @@ //! Aggregated and unaggregated attestations that failed verification due to referencing an unknown //! block will be re-queued until their block is imported, or until they expire. use super::MAX_SCHEDULED_WORK_QUEUE_LEN; +use crate::beacon_processor::{ChainSegmentProcessId, Work, WorkEvent}; use crate::metrics; use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; @@ -18,14 +19,17 @@ use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_D use fnv::FnvHashMap; use futures::task::Poll; use futures::{Stream, StreamExt}; +use itertools::Itertools; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::pin::Pin; use std::task::Context; use std::time::Duration; +use strum::AsRefStr; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; @@ -64,7 +68,21 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; +// Process backfill batch 50%, 60%, 80% through each slot. +// +// Note: use caution to set these fractions in a way that won't cause panic-y +// arithmetic. +pub const BACKFILL_SCHEDULE_IN_SLOT: [(u32, u32); 3] = [ + // One half: 6s on mainnet, 2.5s on Gnosis. + (1, 2), + // Three fifths: 7.2s on mainnet, 3s on Gnosis. + (3, 5), + // Four fifths: 9.6s on mainnet, 4s on Gnosis. + (4, 5), +]; + /// Messages that the scheduler can receive. +#[derive(AsRefStr)] pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), @@ -83,6 +101,8 @@ pub enum ReprocessQueueMessage { UnknownBlockAggregate(QueuedAggregate), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), + /// A new backfill batch that needs to be scheduled for processing. + BackfillSync(QueuedBackfillBatch), } /// Events sent by the scheduler once they are ready for re-processing. @@ -92,6 +112,7 @@ pub enum ReadyWork { Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), LightClientUpdate(QueuedLightClientUpdate), + BackfillSync(QueuedBackfillBatch), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -143,6 +164,40 @@ pub struct QueuedRpcBlock { pub should_process: bool, } +/// A backfill batch work that has been queued for processing later. +#[derive(Clone)] +pub struct QueuedBackfillBatch { + pub process_id: ChainSegmentProcessId, + pub blocks: Vec>>, +} + +impl TryFrom> for QueuedBackfillBatch { + type Error = WorkEvent; + + fn try_from(event: WorkEvent) -> Result> { + match event { + WorkEvent { + work: + Work::ChainSegment { + process_id: process_id @ ChainSegmentProcessId::BackSyncBatchId(_), + blocks, + }, + .. + } => Ok(QueuedBackfillBatch { process_id, blocks }), + _ => Err(event), + } + } +} + +impl From> for WorkEvent { + fn from(queued_backfill_batch: QueuedBackfillBatch) -> WorkEvent { + WorkEvent::chain_segment( + queued_backfill_batch.process_id, + queued_backfill_batch.blocks, + ) + } +} + /// Unifies the different messages processed by the block delay queue. enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. @@ -154,6 +209,8 @@ enum InboundEvent { ReadyAttestation(QueuedAttestationId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), + /// A backfill batch that was queued is ready for processing. + ReadyBackfillSync(QueuedBackfillBatch), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -190,6 +247,8 @@ struct ReprocessQueue { queued_lc_updates: FnvHashMap, DelayKey)>, /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, + /// Queued backfill batches + queued_backfill_batches: Vec>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations @@ -199,6 +258,8 @@ struct ReprocessQueue { rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + next_backfill_batch_event: Option>>, + slot_clock: Pin>, } pub type QueuedLightClientUpdateId = usize; @@ -286,6 +347,20 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { + match next_backfill_batch_event.as_mut().poll(cx) { + Poll::Ready(_) => { + let maybe_batch = self.queued_backfill_batches.pop(); + self.recompute_next_backfill_batch_event(); + + if let Some(batch) = maybe_batch { + return Poll::Ready(Some(InboundEvent::ReadyBackfillSync(batch))); + } + } + Poll::Pending => (), + } + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -322,12 +397,15 @@ pub fn spawn_reprocess_scheduler( queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), + queued_backfill_batches: Vec::new(), next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + next_backfill_batch_event: None, + slot_clock: Box::pin(slot_clock.clone()), }; executor.spawn( @@ -678,6 +756,14 @@ impl ReprocessQueue { } } } + InboundEvent::Msg(BackfillSync(queued_backfill_batch)) => { + self.queued_backfill_batches + .insert(0, queued_backfill_batch); + // only recompute if there is no `next_backfill_batch_event` already scheduled + if self.next_backfill_batch_event.is_none() { + self.recompute_next_backfill_batch_event(); + } + } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.block.block_root; @@ -785,6 +871,33 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyBackfillSync(queued_backfill_batch) => { + let millis_from_slot_start = slot_clock + .millis_from_current_slot_start() + .map_or("null".to_string(), |duration| { + duration.as_millis().to_string() + }); + + debug!( + log, + "Sending scheduled backfill work"; + "millis_from_slot_start" => millis_from_slot_start + ); + + if self + .ready_work_tx + .try_send(ReadyWork::BackfillSync(queued_backfill_batch.clone())) + .is_err() + { + error!( + log, + "Failed to send scheduled backfill work"; + "info" => "sending work back to queue" + ); + self.queued_backfill_batches + .insert(0, queued_backfill_batch); + } + } } metrics::set_gauge_vec( @@ -808,4 +921,95 @@ impl ReprocessQueue { self.lc_updates_delay_queue.len() as i64, ); } + + fn recompute_next_backfill_batch_event(&mut self) { + // only recompute the `next_backfill_batch_event` if there are backfill batches in the queue + if !self.queued_backfill_batches.is_empty() { + self.next_backfill_batch_event = Some(Box::pin(tokio::time::sleep( + ReprocessQueue::::duration_until_next_backfill_batch_event(&self.slot_clock), + ))); + } else { + self.next_backfill_batch_event = None + } + } + + /// Returns duration until the next scheduled processing time. The schedule ensure that backfill + /// processing is done in windows of time that aren't critical + fn duration_until_next_backfill_batch_event(slot_clock: &T::SlotClock) -> Duration { + let slot_duration = slot_clock.slot_duration(); + slot_clock + .millis_from_current_slot_start() + .and_then(|duration_from_slot_start| { + BACKFILL_SCHEDULE_IN_SLOT + .into_iter() + // Convert fractions to seconds from slot start. + .map(|(multiplier, divisor)| (slot_duration / divisor) * multiplier) + .find_or_first(|&event_duration_from_slot_start| { + event_duration_from_slot_start > duration_from_slot_start + }) + .map(|next_event_time| { + if duration_from_slot_start >= next_event_time { + // event is in the next slot, add duration to next slot + let duration_to_next_slot = slot_duration - duration_from_slot_start; + duration_to_next_slot + next_event_time + } else { + next_event_time - duration_from_slot_start + } + }) + }) + // If we can't read the slot clock, just wait another slot. + .unwrap_or(slot_duration) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::builder::Witness; + use beacon_chain::eth1_chain::CachingEth1Backend; + use slot_clock::TestingSlotClock; + use store::MemoryStore; + use types::MainnetEthSpec as E; + use types::Slot; + + type TestBeaconChainType = + Witness, E, MemoryStore, MemoryStore>; + + #[test] + fn backfill_processing_schedule_calculation() { + let slot_duration = Duration::from_secs(12); + let slot_clock = TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), slot_duration); + let current_slot_start = slot_clock.start_of(Slot::new(100)).unwrap(); + slot_clock.set_current_time(current_slot_start); + + let event_times = BACKFILL_SCHEDULE_IN_SLOT + .map(|(multiplier, divisor)| (slot_duration / divisor) * multiplier); + + for &event_duration_from_slot_start in event_times.iter() { + let duration_to_next_event = + ReprocessQueue::::duration_until_next_backfill_batch_event( + &slot_clock, + ); + + let current_time = slot_clock.millis_from_current_slot_start().unwrap(); + + assert_eq!( + duration_to_next_event, + event_duration_from_slot_start - current_time + ); + + slot_clock.set_current_time(current_slot_start + event_duration_from_slot_start) + } + + // check for next event beyond the current slot + let duration_to_next_slot = slot_clock.duration_to_next_slot().unwrap(); + let duration_to_next_event = + ReprocessQueue::::duration_until_next_backfill_batch_event( + &slot_clock, + ); + assert_eq!( + duration_to_next_event, + duration_to_next_slot + event_times[0] + ); + } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 083330753a..de3d45b09e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -268,6 +268,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .min_values(0) .hidden(true) ) + .arg( + Arg::with_name("disable-backfill-rate-limiting") + .long("disable-backfill-rate-limiting") + .help("Disable the backfill sync rate-limiting. This allow users to just sync the entire chain as fast \ + as possible, however it can result in resource contention which degrades staking performance. Stakers \ + should generally choose to avoid this flag since backfill sync is not required for staking.") + .takes_value(false), + ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b9bdf1e965..288f849f4c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -796,6 +796,10 @@ pub fn get_config( client_config.always_prefer_builder_payload = true; } + // Backfill sync rate-limiting + client_config.chain.enable_backfill_rate_limiting = + !cli_args.is_present("disable-backfill-rate-limiting"); + Ok(client_config) } diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index 88b5b68401..90c128751d 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -642,6 +642,30 @@ impl ValidatorClientHttpClient { let url = self.make_gas_limit_url(pubkey)?; self.delete_with_raw_response(url, &()).await } + + /// `POST /eth/v1/validator/{pubkey}/voluntary_exit` + pub async fn post_validator_voluntary_exit( + &self, + pubkey: &PublicKeyBytes, + epoch: Option, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("validator") + .push(&pubkey.to_string()) + .push("voluntary_exit"); + + if let Some(epoch) = epoch { + path.query_pairs_mut() + .append_pair("epoch", &epoch.to_string()); + } + + self.post(path, &()).await + } } /// Returns `Ok(response)` if the response is a `200 OK` response or a diff --git a/common/eth2/src/lighthouse_vc/types.rs b/common/eth2/src/lighthouse_vc/types.rs index 92439337f6..fa5d4ae119 100644 --- a/common/eth2/src/lighthouse_vc/types.rs +++ b/common/eth2/src/lighthouse_vc/types.rs @@ -144,3 +144,8 @@ pub struct UpdateGasLimitRequest { #[serde(with = "eth2_serde_utils::quoted_u64")] pub gas_limit: u64, } + +#[derive(Deserialize)] +pub struct VoluntaryExitQuery { + pub epoch: Option, +} diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index 760b2f9cdb..8f7bbc1b78 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -104,12 +104,23 @@ pub trait SlotClock: Send + Sync + Sized + Clone { self.slot_duration() * 2 / INTERVALS_PER_SLOT as u32 } - /// Returns the `Duration` since the start of the current `Slot`. Useful in determining whether to apply proposer boosts. - fn seconds_from_current_slot_start(&self, seconds_per_slot: u64) -> Option { + /// Returns the `Duration` since the start of the current `Slot` at seconds precision. Useful in determining whether to apply proposer boosts. + fn seconds_from_current_slot_start(&self) -> Option { self.now_duration() .and_then(|now| now.checked_sub(self.genesis_duration())) .map(|duration_into_slot| { - Duration::from_secs(duration_into_slot.as_secs() % seconds_per_slot) + Duration::from_secs(duration_into_slot.as_secs() % self.slot_duration().as_secs()) + }) + } + + /// Returns the `Duration` since the start of the current `Slot` at milliseconds precision. + fn millis_from_current_slot_start(&self) -> Option { + self.now_duration() + .and_then(|now| now.checked_sub(self.genesis_duration())) + .map(|duration_into_slot| { + Duration::from_millis( + (duration_into_slot.as_millis() % self.slot_duration().as_millis()) as u64, + ) }) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 8ea89f49de..b6327ade15 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1052,6 +1052,19 @@ fn disable_upnp_flag() { .with_config(|config| assert!(!config.network.upnp_enabled)); } #[test] +fn disable_backfill_rate_limiting_flag() { + CommandLineTest::new() + .flag("disable-backfill-rate-limiting", None) + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.enable_backfill_rate_limiting)); +} +#[test] +fn default_backfill_rate_limiting_flag() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.chain.enable_backfill_rate_limiting)); +} +#[test] fn default_boot_nodes() { let mainnet = vec![ // Lighthouse Team (Sigma Prime) diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 0e8c9a9ae9..49342d2114 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -425,7 +425,7 @@ impl Tester { .harness .chain .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .unwrap(); let result = self diff --git a/validator_client/src/http_api/create_signed_voluntary_exit.rs b/validator_client/src/http_api/create_signed_voluntary_exit.rs new file mode 100644 index 0000000000..b777d15806 --- /dev/null +++ b/validator_client/src/http_api/create_signed_voluntary_exit.rs @@ -0,0 +1,69 @@ +use crate::validator_store::ValidatorStore; +use bls::{PublicKey, PublicKeyBytes}; +use slog::{info, Logger}; +use slot_clock::SlotClock; +use std::sync::Arc; +use types::{Epoch, EthSpec, SignedVoluntaryExit, VoluntaryExit}; + +pub async fn create_signed_voluntary_exit( + pubkey: PublicKey, + maybe_epoch: Option, + validator_store: Arc>, + slot_clock: T, + log: Logger, +) -> Result { + let epoch = match maybe_epoch { + Some(epoch) => epoch, + None => get_current_epoch::(slot_clock).ok_or_else(|| { + warp_utils::reject::custom_server_error("Unable to determine current epoch".to_string()) + })?, + }; + + let pubkey_bytes = PublicKeyBytes::from(pubkey); + if !validator_store.has_validator(&pubkey_bytes) { + return Err(warp_utils::reject::custom_not_found(format!( + "{} is disabled or not managed by this validator client", + pubkey_bytes.as_hex_string() + ))); + } + + let validator_index = validator_store + .validator_index(&pubkey_bytes) + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "The validator index for {} is not known. The validator client \ + may still be initializing or the validator has not yet had a \ + deposit processed.", + pubkey_bytes.as_hex_string() + )) + })?; + + let voluntary_exit = VoluntaryExit { + epoch, + validator_index, + }; + + info!( + log, + "Signing voluntary exit"; + "validator" => pubkey_bytes.as_hex_string(), + "epoch" => epoch + ); + + let signed_voluntary_exit = validator_store + .sign_voluntary_exit(pubkey_bytes, voluntary_exit) + .await + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to sign voluntary exit: {:?}", + e + )) + })?; + + Ok(signed_voluntary_exit) +} + +/// Calculates the current epoch from the genesis time and current time. +fn get_current_epoch(slot_clock: T) -> Option { + slot_clock.now().map(|s| s.epoch(E::slots_per_epoch())) +} diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index b87bb08381..15b3f9fe09 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -1,9 +1,11 @@ mod api_secret; +mod create_signed_voluntary_exit; mod create_validator; mod keystores; mod remotekeys; mod tests; +use crate::http_api::create_signed_voluntary_exit::create_signed_voluntary_exit; use crate::{determine_graffiti, GraffitiFile, ValidatorStore}; use account_utils::{ mnemonic_from_phrase, @@ -71,6 +73,7 @@ pub struct Context { pub spec: ChainSpec, pub config: Config, pub log: Logger, + pub slot_clock: T, pub _phantom: PhantomData, } @@ -189,6 +192,9 @@ pub fn serve( let inner_ctx = ctx.clone(); let log_filter = warp::any().map(move || inner_ctx.log.clone()); + let inner_slot_clock = ctx.slot_clock.clone(); + let slot_clock_filter = warp::any().map(move || inner_slot_clock.clone()); + let inner_spec = Arc::new(ctx.spec.clone()); let spec_filter = warp::any().map(move || inner_spec.clone()); @@ -904,6 +910,46 @@ pub fn serve( ) .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT)); + // POST /eth/v1/validator/{pubkey}/voluntary_exit + let post_validators_voluntary_exits = eth_v1 + .and(warp::path("validator")) + .and(warp::path::param::()) + .and(warp::path("voluntary_exit")) + .and(warp::query::()) + .and(warp::path::end()) + .and(validator_store_filter.clone()) + .and(slot_clock_filter) + .and(log_filter.clone()) + .and(signer.clone()) + .and(task_executor_filter.clone()) + .and_then( + |pubkey: PublicKey, + query: api_types::VoluntaryExitQuery, + validator_store: Arc>, + slot_clock: T, + log, + signer, + task_executor: TaskExecutor| { + blocking_signed_json_task(signer, move || { + if let Some(handle) = task_executor.handle() { + let signed_voluntary_exit = + handle.block_on(create_signed_voluntary_exit( + pubkey, + query.epoch, + validator_store, + slot_clock, + log, + ))?; + Ok(signed_voluntary_exit) + } else { + Err(warp_utils::reject::custom_server_error( + "Lighthouse shutting down".into(), + )) + } + }) + }, + ); + // GET /eth/v1/keystores let get_std_keystores = std_keystores .and(signer.clone()) @@ -1001,6 +1047,7 @@ pub fn serve( .or(post_validators_keystore) .or(post_validators_mnemonic) .or(post_validators_web3signer) + .or(post_validators_voluntary_exits) .or(post_fee_recipient) .or(post_gas_limit) .or(post_std_keystores) diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index d453d7038a..df0e480444 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -45,6 +45,7 @@ struct ApiTester { initialized_validators: Arc>, validator_store: Arc>, url: SensitiveUrl, + slot_clock: TestingSlotClock, _server_shutdown: oneshot::Sender<()>, _validator_dir: TempDir, _runtime_shutdown: exit_future::Signal, @@ -90,8 +91,12 @@ impl ApiTester { let slashing_db_path = config.validator_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap(); - let slot_clock = - TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1)); + let genesis_time: u64 = 0; + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(genesis_time), + Duration::from_secs(1), + ); let (runtime_shutdown, exit) = exit_future::signal(); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); @@ -101,9 +106,9 @@ impl ApiTester { initialized_validators, slashing_protection, Hash256::repeat_byte(42), - spec, + spec.clone(), Some(Arc::new(DoppelgangerService::new(log.clone()))), - slot_clock, + slot_clock.clone(), &config, executor.clone(), log.clone(), @@ -129,7 +134,8 @@ impl ApiTester { listen_port: 0, allow_origin: None, }, - log, + log: log.clone(), + slot_clock: slot_clock.clone(), _phantom: PhantomData, }); let ctx = context.clone(); @@ -156,6 +162,7 @@ impl ApiTester { initialized_validators, validator_store, url, + slot_clock, _server_shutdown: shutdown_tx, _validator_dir: validator_dir, _runtime_shutdown: runtime_shutdown, @@ -494,6 +501,33 @@ impl ApiTester { self } + pub async fn test_sign_voluntary_exits(self, index: usize, maybe_epoch: Option) -> Self { + let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; + // manually setting validator index in `ValidatorStore` + self.initialized_validators + .write() + .set_index(&validator.voting_pubkey, 0); + + let expected_exit_epoch = maybe_epoch.unwrap_or_else(|| self.get_current_epoch()); + + let resp = self + .client + .post_validator_voluntary_exit(&validator.voting_pubkey, maybe_epoch) + .await; + + assert!(resp.is_ok()); + assert_eq!(resp.unwrap().message.epoch, expected_exit_epoch); + + self + } + + fn get_current_epoch(&self) -> Epoch { + self.slot_clock + .now() + .map(|s| s.epoch(E::slots_per_epoch())) + .unwrap() + } + pub async fn set_validator_enabled(self, index: usize, enabled: bool) -> Self { let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; @@ -778,6 +812,29 @@ fn hd_validator_creation() { }); } +#[test] +fn validator_exit() { + let runtime = build_runtime(); + let weak_runtime = Arc::downgrade(&runtime); + runtime.block_on(async { + ApiTester::new(weak_runtime) + .await + .create_hd_validators(HdValidatorScenario { + count: 2, + specify_mnemonic: false, + key_derivation_path_offset: 0, + disabled: vec![], + }) + .await + .assert_enabled_validators_count(2) + .assert_validators_count(2) + .test_sign_voluntary_exits(0, None) + .await + .test_sign_voluntary_exits(0, Some(Epoch::new(256))) + .await; + }); +} + #[test] fn validator_enabling() { let runtime = build_runtime(); diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 3beb5dff19..6f05e17c36 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -93,6 +93,11 @@ lazy_static::lazy_static! { "Total count of attempted SyncSelectionProof signings", &["status"] ); + pub static ref SIGNED_VOLUNTARY_EXITS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_voluntary_exits_total", + "Total count of VoluntaryExit signings", + &["status"] + ); pub static ref SIGNED_VALIDATOR_REGISTRATIONS_TOTAL: Result = try_create_int_counter_vec( "builder_validator_registrations_total", "Total count of ValidatorRegistrationData signings", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 82cacccc60..556fdef26b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -94,6 +94,7 @@ pub struct ProductionValidatorClient { doppelganger_service: Option>, preparation_service: PreparationService, validator_store: Arc>, + slot_clock: SystemTimeSlotClock, http_api_listen_addr: Option, config: Config, } @@ -461,7 +462,7 @@ impl ProductionValidatorClient { let sync_committee_service = SyncCommitteeService::new( duties_service.clone(), validator_store.clone(), - slot_clock, + slot_clock.clone(), beacon_nodes.clone(), context.service_context("sync_committee".into()), ); @@ -482,6 +483,7 @@ impl ProductionValidatorClient { preparation_service, validator_store, config, + slot_clock, http_api_listen_addr: None, }) } @@ -544,6 +546,7 @@ impl ProductionValidatorClient { graffiti_flag: self.config.graffiti, spec: self.context.eth2_config.spec.clone(), config: self.config.http_api.clone(), + slot_clock: self.slot_clock.clone(), log: log.clone(), _phantom: PhantomData, }); diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index ae9df08096..0de2f2f54f 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -47,6 +47,7 @@ pub enum SignableMessage<'a, T: EthSpec, Payload: AbstractExecPayload = FullP }, SignedContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), + VoluntaryExit(&'a VoluntaryExit), } impl<'a, T: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, T, Payload> { @@ -67,6 +68,7 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, T, Pay } => beacon_block_root.signing_root(domain), SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), + SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), } } } @@ -203,6 +205,7 @@ impl SigningMethod { SignableMessage::ValidatorRegistration(v) => { Web3SignerObject::ValidatorRegistration(v) } + SignableMessage::VoluntaryExit(e) => Web3SignerObject::VoluntaryExit(e), }; // Determine the Web3Signer message type. diff --git a/validator_client/src/signing_method/web3signer.rs b/validator_client/src/signing_method/web3signer.rs index 512cbc7d02..54352394af 100644 --- a/validator_client/src/signing_method/web3signer.rs +++ b/validator_client/src/signing_method/web3signer.rs @@ -63,7 +63,6 @@ pub enum Web3SignerObject<'a, T: EthSpec, Payload: AbstractExecPayload> { RandaoReveal { epoch: Epoch, }, - #[allow(dead_code)] VoluntaryExit(&'a VoluntaryExit), SyncCommitteeMessage { beacon_block_root: Hash256, diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 36a0d05734..73843579a2 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -22,8 +22,9 @@ use types::{ AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, + ValidatorRegistrationData, VoluntaryExit, }; use validator_dir::ValidatorDir; @@ -155,6 +156,14 @@ impl ValidatorStore { self.validators.clone() } + /// Indicates if the `voting_public_key` exists in self and is enabled. + pub fn has_validator(&self, voting_public_key: &PublicKeyBytes) -> bool { + self.validators + .read() + .validator(voting_public_key) + .is_some() + } + /// Insert a new validator to `self`, where the validator is represented by an EIP-2335 /// keystore on the filesystem. #[allow(clippy::too_many_arguments)] @@ -616,6 +625,32 @@ impl ValidatorStore { } } + pub async fn sign_voluntary_exit( + &self, + validator_pubkey: PublicKeyBytes, + voluntary_exit: VoluntaryExit, + ) -> Result { + let signing_epoch = voluntary_exit.epoch; + let signing_context = self.signing_context(Domain::VoluntaryExit, signing_epoch); + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::VoluntaryExit(&voluntary_exit), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + metrics::inc_counter_vec(&metrics::SIGNED_VOLUNTARY_EXITS_TOTAL, &[metrics::SUCCESS]); + + Ok(SignedVoluntaryExit { + message: voluntary_exit, + signature, + }) + } + pub async fn sign_validator_registration_data( &self, validator_registration_data: ValidatorRegistrationData,