use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::network_context::RangeRequestId; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; use rand::{seq::SliceRandom, Rng}; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; use strum::IntoStaticStr; use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, /// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which /// case the responder will fill the response up to the max request size, assuming they have the /// bandwidth to do so. pub const EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. const BATCH_BUFFER_SIZE: u8 = 5; /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. /// /// Should be checked, since a failed chain must be removed. A chain that requested being removed /// and continued is now in an inconsistent state. pub type ProcessingResult = Result; /// Reasons for removing a chain #[derive(Debug)] pub enum RemoveChain { EmptyPeerPool, ChainCompleted, /// A chain has failed. This boolean signals whether the chain should be blacklisted. ChainFailed { blacklist: bool, failing_batch: BatchId, }, WrongBatchState(String), WrongChainState(String), } #[derive(Debug)] pub struct KeepChain; /// A chain identifier pub type ChainId = u64; pub type BatchId = Epoch; #[derive(Debug, Copy, Clone, IntoStaticStr)] pub enum SyncingChainType { Head, Finalized, Backfill, } /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the /// chain. pub struct SyncingChain { /// A random id used to identify this chain. id: ChainId, /// SyncingChain type pub chain_type: SyncingChainType, /// The start of the chain segment. Any epoch previous to this one has been validated. pub start_epoch: Epoch, /// The target head slot. pub target_head_slot: Slot, /// The target head root. pub target_head_root: Hash256, /// Sorted map of batches undergoing some kind of processing. batches: BTreeMap>, /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from, as well as the batches we are currently /// requesting. peers: FnvHashMap>, /// Starting epoch of the next batch that needs to be downloaded. to_be_downloaded: BatchId, /// Starting epoch of the batch that needs to be processed next. /// This is incremented as the chain advances. processing_target: BatchId, /// Optimistic head to sync. /// If a block is imported for this batch, the chain advances to this point. optimistic_start: Option, /// When a batch for an optimistic start is tried (either successful or not), it is stored to /// avoid trying it again due to chain stopping/re-starting on chain switching. attempted_optimistic_starts: HashSet, /// The current state of the chain. pub state: ChainSyncingState, /// The current processing batch, if any. current_processing_batch: Option, /// Batches validated by this chain. validated_batches: u64, /// The chain's log. log: slog::Logger, } #[derive(PartialEq, Debug)] pub enum ChainSyncingState { /// The chain is not being synced. Stopped, /// The chain is undergoing syncing. Syncing, } impl SyncingChain { pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 { let mut hasher = std::collections::hash_map::DefaultHasher::new(); (target_root, target_slot).hash(&mut hasher); hasher.finish() } #[allow(clippy::too_many_arguments)] pub fn new( start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, chain_type: SyncingChainType, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); peers.insert(peer_id, Default::default()); let id = SyncingChain::::id(&target_head_root, &target_head_slot); SyncingChain { id, chain_type, start_epoch, target_head_slot, target_head_root, batches: BTreeMap::new(), peers, to_be_downloaded: start_epoch, processing_target: start_epoch, optimistic_start: None, attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, validated_batches: 0, log: log.new(o!("chain" => id)), } } /// Check if the chain has peers from which to process batches. pub fn available_peers(&self) -> usize { self.peers.len() } /// Get the chain's id. pub fn get_id(&self) -> ChainId { self.id } /// Peers currently syncing this chain. pub fn peers(&self) -> impl Iterator + '_ { self.peers.keys().cloned() } /// Progress in epochs made by the chain pub fn validated_epochs(&self) -> u64 { self.validated_batches * EPOCHS_PER_BATCH } /// Returns the total count of pending blocks in all the batches of this chain pub fn pending_blocks(&self) -> usize { self.batches .values() .map(|batch| batch.pending_blocks()) .sum() } /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer( &mut self, peer_id: &PeerId, network: &mut SyncNetworkContext, ) -> ProcessingResult { if let Some(batch_ids) = self.peers.remove(peer_id) { // fail the batches. for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: id, }); } self.retry_batch_download(network, id)?; } else { debug!(self.log, "Batch not found while removing peer"; "peer" => %peer_id, "batch" => id) } } } if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) } else { Ok(KeepChain) } } /// Returns the latest slot number that has been processed. fn current_processed_slot(&self) -> Slot { // the last slot we processed was included in the previous batch, and corresponds to the // first slot of the current target epoch self.processing_target .start_slot(T::EthSpec::slots_per_epoch()) } /// A block has been received for a batch on this chain. /// If the block correctly completes the batch it will be processed if possible. pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, blocks: Vec>, ) -> ProcessingResult { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { None => { debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id); // A batch might get removed when the chain advances, so this is non fatal. return Ok(KeepChain); } Some(batch) => { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches if !batch.is_expecting_block(peer_id, &request_id) { return Ok(KeepChain); } batch } }; { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches self.peers .get_mut(peer_id) .map(|active_requests| active_requests.remove(&batch_id)); match batch.download_completed(blocks) { Ok(received) => { let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; debug!(self.log, "Batch downloaded"; "epoch" => batch_id, "blocks" => received, "batch_state" => self.visualize_batch_state(), "awaiting_batches" => awaiting_batches); // pre-emptively request more blocks from peers whilst we process current blocks, self.request_batches(network)?; self.process_completed_batches(network) } Err(result) => { let (expected_boundary, received_boundary, outcome) = result?; warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, "peer_id" => %peer_id, batch); if let BatchOperationOutcome::Failed { blacklist } = outcome { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }); } // this batch can't be used, so we need to request it again. self.retry_batch_download(network, batch_id) } } } } /// Processes the batch with the given id. /// The batch must exist and be ready for processing fn process_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { // Only process batches if this chain is Syncing, and only one at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { return Ok(KeepChain); } let Some(beacon_processor) = network.beacon_processor_if_enabled() else { return Ok(KeepChain); }; let Some(batch) = self.batches.get_mut(&batch_id) else { return Err(RemoveChain::WrongChainState(format!( "Trying to process a batch that does not exist: {}", batch_id ))); }; // NOTE: We send empty batches to the processor in order to trigger the block processor // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. let (blocks, duration_in_awaiting_processing) = batch.start_processing()?; metrics::observe_duration( &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING, duration_in_awaiting_processing, ); let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); // This is unlikely to happen but it would stall syncing since the batch now has no // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(KeepChain) } } /// Processes the next ready batch, prioritizing optimistic batches over the processing target. fn process_completed_batches( &mut self, network: &mut SyncNetworkContext, ) -> ProcessingResult { // Only process batches if this chain is Syncing and only process one batch at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { return Ok(KeepChain); } // Find the id of the batch we are going to process. // // First try our optimistic start, if any. If this batch is ready, we process it. If the // batch has not already been completed, check the current chain target. if let Some(epoch) = self.optimistic_start { if let Some(batch) = self.batches.get(&epoch) { let state = batch.state(); match state { BatchState::AwaitingProcessing(..) => { // this batch is ready debug!(self.log, "Processing optimistic start"; "epoch" => epoch); return self.process_batch(network, epoch); } BatchState::Downloading(..) => { // The optimistic batch is being downloaded. We wait for this before // attempting to process other batches. return Ok(KeepChain); } BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should // have been removed // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. return Err(RemoveChain::WrongChainState(format!( "Optimistic batch indicates inconsistent chain state: {:?}", state ))); } BatchState::AwaitingValidation(_) => { // If an optimistic start is given to the chain after the corresponding // batch has been requested and processed we can land here. We drop the // optimistic candidate since we can't conclude whether the batch included // blocks or not at this point debug!(self.log, "Dropping optimistic candidate"; "batch" => epoch); self.optimistic_start = None; } } } } // if the optimistic target can't be processed, check the processing target if let Some(batch) = self.batches.get(&self.processing_target) { let state = batch.state(); match state { BatchState::AwaitingProcessing(..) => { return self.process_batch(network, self.processing_target); } BatchState::Downloading(..) => { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have beee removed // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. // - Processing -> `self.current_processing_batch` is None return Err(RemoveChain::WrongChainState(format!( "Robust target batch indicates inconsistent chain state: {:?}", state ))); } BatchState::AwaitingValidation(_) => { // we can land here if an empty optimistic batch succeeds processing and is // inside the download buffer (between `self.processing_target` and // `self.to_be_downloaded`). In this case, eventually the chain advances to the // batch (`self.processing_target` reaches this point). debug!(self.log, "Chain encountered a robust batch awaiting validation"; "batch" => self.processing_target); self.processing_target += EPOCHS_PER_BATCH; if self.to_be_downloaded <= self.processing_target { self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH; } self.request_batches(network)?; } } } else { return Err(RemoveChain::WrongChainState(format!( "Batch not found for current processing target {}", self.processing_target ))); } Ok(KeepChain) } /// The block processor has completed processing a batch. This function handles the result /// of the batch processor. pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, result: &BatchProcessResult, ) -> ProcessingResult { // the first two cases are possible if the chain advances while waiting for a processing // result let batch_state = self.visualize_batch_state(); let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); return Ok(KeepChain); } None => { debug!(self.log, "Chain was not expecting a batch result"; "batch_epoch" => batch_id); return Ok(KeepChain); } _ => { // batch_id matches, continue self.current_processing_batch = None; self.batches.get_mut(&batch_id).ok_or_else(|| { RemoveChain::WrongChainState(format!( "Current processing batch not found: {}", batch_id )) })? } }; let peer = batch.current_peer().cloned().ok_or_else(|| { RemoveChain::WrongBatchState(format!( "Processing target is in wrong state: {:?}", batch.state(), )) })?; // Log the process result and the batch for debugging purposes. debug!(self.log, "Batch processing result"; "result" => ?result, &batch, "batch_epoch" => batch_id, "client" => %network.client_type(&peer), "batch_state" => batch_state); // We consider three cases. Batch was successfully processed, Batch failed processing due // to a faulty peer, or batch failed processing but the peer can't be deemed faulty. match result { BatchProcessResult::Success { sent_blocks, imported_blocks, } => { if sent_blocks > imported_blocks { let ignored_blocks = sent_blocks - imported_blocks; metrics::inc_counter_vec_by( &metrics::SYNCING_CHAINS_IGNORED_BLOCKS, &[self.chain_type.into()], ignored_blocks as u64, ); } metrics::inc_counter_vec( &metrics::SYNCING_CHAINS_PROCESSED_BATCHES, &[self.chain_type.into()], ); batch.processing_completed(BatchProcessingResult::Success)?; // was not empty = sent_blocks > 0 if *sent_blocks > 0 { // If the processed batch was not empty, we can validate previous unvalidated // blocks. self.advance_chain(network, batch_id); // we register so that on chain switching we don't try it again self.attempted_optimistic_starts.insert(batch_id); } else if self.optimistic_start == Some(batch_id) { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty self.reject_optimistic_batch( network, false, /* do not re-request */ "batch was empty", )?; } if batch_id == self.processing_target { self.processing_target += EPOCHS_PER_BATCH; } // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed Err(RemoveChain::ChainCompleted) } else { // chain is not completed // attempt to request more batches self.request_batches(network)?; // attempt to process more batches self.process_completed_batches(network) } } BatchProcessResult::FaultyFailure { imported_blocks, penalty, } => { // Penalize the peer appropiately. network.report_peer(peer, *penalty, "faulty_batch"); // Check if this batch is allowed to continue match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { BatchOperationOutcome::Continue => { // Chain can continue. Check if it can be moved forward. if *imported_blocks > 0 { // At least one block was successfully verified and imported, so we can be sure all // previous batches are valid and we only need to download the current failed // batch. self.advance_chain(network, batch_id); } // Handle this invalid batch, that is within the re-process retries limit. self.handle_invalid_batch(network, batch_id) } BatchOperationOutcome::Failed { blacklist } => { // Check that we have not exceeded the re-process retry counter, // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and // report all peers. // There are some edge cases with forks that could land us in this situation. // This should be unlikely, so we tolerate these errors, but not often. warn!( self.log, "Batch failed to download. Dropping chain scoring peers"; "score_adjustment" => %penalty, "batch_epoch"=> batch_id, ); for (peer, _) in self.peers.drain() { network.report_peer(peer, *penalty, "faulty_chain"); } Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }) } } } BatchProcessResult::NonFaultyFailure => { batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; // Simply redownload the batch. self.retry_batch_download(network, batch_id) } } } fn reject_optimistic_batch( &mut self, network: &mut SyncNetworkContext, redownload: bool, reason: &str, ) -> ProcessingResult { if let Some(epoch) = self.optimistic_start.take() { self.attempted_optimistic_starts.insert(epoch); // if this batch is inside the current processing range, keep it, otherwise drop // it. NOTE: this is done to prevent non-sequential batches coming from optimistic // starts from filling up the buffer size if epoch < self.to_be_downloaded { debug!(self.log, "Rejected optimistic batch left for future use"; "epoch" => %epoch, "reason" => reason); // this batch is now treated as any other batch, and re-requested for future use if redownload { return self.retry_batch_download(network, epoch); } } else { debug!(self.log, "Rejected optimistic batch"; "epoch" => %epoch, "reason" => reason); self.batches.remove(&epoch); } } Ok(KeepChain) } /// Removes any batches previous to the given `validating_epoch` and updates the current /// boundaries of the chain. /// /// The `validating_epoch` must align with batch boundaries. /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. #[allow(clippy::modulo_one)] fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch <= self.start_epoch { return; } // safety check for batch boundaries if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH { crit!(self.log, "Validating Epoch is not aligned"); return; } // batches in the range [BatchId, ..) (not yet validated) let remaining_batches = self.batches.split_off(&validating_epoch); // batches less than `validating_epoch` let removed_batches = std::mem::replace(&mut self.batches, remaining_batches); for (id, batch) in removed_batches.into_iter() { self.validated_batches = self.validated_batches.saturating_add(1); // only for batches awaiting validation can we be sure the last attempt is // right, and thus, that any different attempt is wrong match batch.state() { BatchState::AwaitingValidation(ref processed_attempt) => { for attempt in batch.attempts() { // The validated batch has been re-processed if attempt.hash != processed_attempt.hash { // The re-downloaded version was different if processed_attempt.peer_id != attempt.peer_id { // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. let action = PeerAction::LowToleranceError; debug!(self.log, "Re-processed batch validated. Scoring original peer"; "batch_epoch" => id, "score_adjustment" => %action, "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id ); network.report_peer( attempt.peer_id, action, "batch_reprocessed_original_peer", ); } else { // The same peer corrected it's previous mistake. There was an error, so we // negative score the original peer. let action = PeerAction::MidToleranceError; debug!(self.log, "Re-processed batch validated by the same peer"; "batch_epoch" => id, "score_adjustment" => %action, "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id ); network.report_peer( attempt.peer_id, action, "batch_reprocessed_same_peer", ); } } } } BatchState::Downloading(peer, ..) => { // remove this batch from the peer's active requests if let Some(active_batches) = self.peers.get_mut(peer) { active_batches.remove(&id); } } BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => crit!( self.log, "batch indicates inconsistent chain state while advancing chain" ), BatchState::AwaitingProcessing(..) => {} BatchState::Processing(_) => { debug!(self.log, "Advancing chain while processing a batch"; "batch" => id, batch); if let Some(processing_id) = self.current_processing_batch { if id <= processing_id { self.current_processing_batch = None; } } } } } self.processing_target = self.processing_target.max(validating_epoch); let old_start = self.start_epoch; self.start_epoch = validating_epoch; self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch); if self.batches.contains_key(&self.to_be_downloaded) { // if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we // won't have this batch, so we need to request it. self.to_be_downloaded += EPOCHS_PER_BATCH; } if let Some(epoch) = self.optimistic_start { if epoch <= validating_epoch { self.optimistic_start = None; } } debug!(self.log, "Chain advanced"; "previous_start" => old_start, "new_start" => self.start_epoch, "processing_target" => self.processing_target); } /// An invalid batch has been received that could not be processed, but that can be retried. /// /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { // The current batch could not be processed, indicating either the current or previous // batches are invalid. // The previous batch could be incomplete due to the block sizes being too large to fit in // a single RPC request or there could be consecutive empty batches which are not supposed // to be there // The current (sub-optimal) strategy is to simply re-request all batches that could // potentially be faulty. If a batch returns a different result than the original and // results in successful processing, we downvote the original peer that sent us the batch. if let Some(epoch) = self.optimistic_start { // If this batch is an optimistic batch, we reject this epoch as an optimistic // candidate and try to re download it if epoch == batch_id { return self.reject_optimistic_batch(network, true, "batch was invalid"); // since this is the optimistic batch, we can't consider previous batches as // invalid. } } // this is our robust `processing_target`. All previous batches must be awaiting // validation let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { // remove the chain early return Err(RemoveChain::ChainFailed { blacklist, failing_batch: *id, }); } redownload_queue.push(*id); } // no batch maxed out it process attempts, so now the chain's volatile progress must be // reset self.processing_target = self.start_epoch; for id in redownload_queue { self.retry_batch_download(network, id)?; } // finally, re-request the failed batch. self.retry_batch_download(network, batch_id) } pub fn stop_syncing(&mut self) { self.state = ChainSyncingState::Stopped; } /// Either a new chain, or an old one with a peer list /// This chain has been requested to start syncing. /// /// This could be new chain, or an old chain that is being resumed. pub fn start_syncing( &mut self, network: &mut SyncNetworkContext, local_finalized_epoch: Epoch, optimistic_start_epoch: Epoch, ) -> ProcessingResult { // to avoid dropping local progress, we advance the chain wrt its batch boundaries. This let align = |epoch| { // start_epoch + (number of batches in between)*length_of_batch self.start_epoch + ((epoch - self.start_epoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH }; // get the *aligned* epoch that produces a batch containing the `local_finalized_epoch` let validating_epoch = align(local_finalized_epoch); // align the optimistic_start too. let optimistic_epoch = align(optimistic_start_epoch); // advance the chain to the new validating epoch self.advance_chain(network, validating_epoch); if self.optimistic_start.is_none() && optimistic_epoch > self.processing_target && !self.attempted_optimistic_starts.contains(&optimistic_epoch) { self.optimistic_start = Some(optimistic_epoch); } // update the state self.state = ChainSyncingState::Syncing; // begin requesting blocks from the peer pool, until all peers are exhausted. self.request_batches(network)?; // start processing batches if needed self.process_completed_batches(network) } /// Add a peer to the chain. /// /// If the chain is active, this starts requesting batches from this peer. pub fn add_peer( &mut self, network: &mut SyncNetworkContext, peer_id: PeerId, ) -> ProcessingResult { // add the peer without overwriting its active requests if self.peers.entry(peer_id).or_default().is_empty() { // Either new or not, this peer is idle, try to request more batches self.request_batches(network) } else { Ok(KeepChain) } } /// An RPC error has occurred. /// /// If the batch exists it is re-requested. pub fn inject_error( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer if !batch.is_expecting_block(peer_id, &request_id) { debug!( self.log, "Batch not expecting block"; "batch_epoch" => batch_id, "batch_state" => ?batch.state(), "peer_id" => %peer_id, "request_id" => %request_id, "batch_state" => batch_state ); return Ok(KeepChain); } debug!( self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id, "batch_state" => ?batch.state(), "peer_id" => %peer_id, "request_id" => %request_id, "batch_state" => batch_state ); if let Some(active_requests) = self.peers.get_mut(peer_id) { active_requests.remove(&batch_id); } if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }); } self.retry_batch_download(network, batch_id) } else { debug!( self.log, "Batch not found"; "batch_epoch" => batch_id, "peer_id" => %peer_id, "request_id" => %request_id, "batch_state" => batch_state ); // this could be an error for an old batch, removed when the chain advances Ok(KeepChain) } } /// Sends and registers the request of a batch awaiting download. pub fn retry_batch_download( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { let Some(batch) = self.batches.get_mut(&batch_id) else { return Ok(KeepChain); }; // Find a peer to request the batch let failed_peers = batch.failed_peers(); let new_peer = self .peers .iter() .map(|(peer, requests)| { ( failed_peers.contains(peer), requests.len(), rand::thread_rng().gen::(), *peer, ) }) // Sort peers prioritizing unrelated peers with less active requests. .min() .map(|(_, _, _, peer)| peer); if let Some(peer) = new_peer { self.send_batch(network, batch_id, peer) } else { // If we are here the chain has no more peers Err(RemoveChain::EmptyPeerPool) } } /// Requests the batch assigned to the given id from a given peer. pub fn send_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, peer: PeerId, ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); match network.blocks_and_blobs_by_range_request( peer, batch_type, request, RangeRequestId::RangeSync { chain_id: self.id, batch_id, }, ) { Ok(request_id) => { // inform the batch about the new request batch.start_downloading_from_peer(peer, request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) .unwrap_or(false) { debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch, "batch_state" => batch_state); } else { debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch, "batch_state" => batch_state); } // register the batch for this peer return self .peers .get_mut(&peer) .map(|requests| { requests.insert(batch_id); Ok(KeepChain) }) .unwrap_or_else(|| { Err(RemoveChain::WrongChainState(format!( "Sending batch to a peer that is not in the chain: {}", peer ))) }); } Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => ?e, &batch); // register the failed download and check if the batch can be retried batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); match batch.download_failed(true)? { BatchOperationOutcome::Failed { blacklist } => { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }) } BatchOperationOutcome::Continue => { return self.retry_batch_download(network, batch_id) } } } } } Ok(KeepChain) } /// Returns true if this chain is currently syncing. pub fn is_syncing(&self) -> bool { match self.state { ChainSyncingState::Syncing => true, ChainSyncingState::Stopped => false, } } /// Kickstarts the chain by sending for processing batches that are ready and requesting more /// batches if needed. pub fn resume( &mut self, network: &mut SyncNetworkContext, ) -> Result { // Request more batches if needed. self.request_batches(network)?; // If there is any batch ready for processing, send it. self.process_completed_batches(network) } /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches(&mut self, network: &mut SyncNetworkContext) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); } // find the next pending batch and request it from the peer // randomize the peers for load balancing let mut rng = rand::thread_rng(); let mut idle_peers = self .peers .iter() .filter_map(|(peer, requests)| { if requests.is_empty() { Some(*peer) } else { None } }) .collect::>(); idle_peers.shuffle(&mut rng); // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { if let Entry::Vacant(entry) = self.batches.entry(epoch) { if let Some(peer) = idle_peers.pop() { let batch_type = network.batch_type(epoch); let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); entry.insert(optimistic_batch); self.send_batch(network, epoch, peer)?; } } return Ok(KeepChain); } while let Some(peer) = idle_peers.pop() { if let Some(batch_id) = self.include_next_batch(network) { // send the batch self.send_batch(network, batch_id, peer)?; } else { // No more batches, simply stop return Ok(KeepChain); } } Ok(KeepChain) } /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond the target head slot if self .to_be_downloaded .start_slot(T::EthSpec::slots_per_epoch()) >= self.target_head_slot { return None; } // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. let in_buffer = |batch: &BatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) ) }; if self .batches .iter() .filter(|&(_epoch, batch)| in_buffer(batch)) .count() > BATCH_BUFFER_SIZE as usize { return None; } let batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch match self.batches.entry(batch_id) { Entry::Occupied(_) => { // this batch doesn't need downloading, let this same function decide the next batch self.to_be_downloaded += EPOCHS_PER_BATCH; self.include_next_batch(network) } Entry::Vacant(entry) => { let batch_type = network.batch_type(batch_id); entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type)); self.to_be_downloaded += EPOCHS_PER_BATCH; Some(batch_id) } } } /// Creates a string visualization of the current state of the chain, to make it easier for debugging and understanding /// where sync is up to from glancing at the logs. /// /// This produces a string of the form: [D,E,E,E,E] /// to indicate the current buffer state of the chain. The symbols are defined on each of the /// batch states. See [BatchState::visualize] for symbol definitions. fn visualize_batch_state(&self) -> String { let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize); // Start of the block visualization_string.push('['); for mut batch_index in 0..BATCH_BUFFER_SIZE { if let Some(batch) = self .batches .get(&(self.processing_target + batch_index as u64 * EPOCHS_PER_BATCH)) { visualization_string.push(batch.visualize()); if batch_index != BATCH_BUFFER_SIZE { // Add a comma in between elements visualization_string.push(','); } } else { // No batch exists, it is on our list to be downloaded // Fill in the rest of the gaps while batch_index < BATCH_BUFFER_SIZE { visualization_string.push('E'); // Add a comma between the empty batches if batch_index < BATCH_BUFFER_SIZE.saturating_sub(1) { visualization_string.push(',') } batch_index += 1; } break; } } visualization_string.push(']'); visualization_string } } impl slog::KV for &mut SyncingChain { fn serialize( &self, record: &slog::Record, serializer: &mut dyn slog::Serializer, ) -> slog::Result { slog::KV::serialize(*self, record, serializer) } } impl slog::KV for SyncingChain { fn serialize( &self, record: &slog::Record, serializer: &mut dyn slog::Serializer, ) -> slog::Result { use slog::Value; serializer.emit_u64("id", self.id)?; Value::serialize(&self.start_epoch, record, "from", serializer)?; Value::serialize( &self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()), record, "to", serializer, )?; serializer.emit_arguments("end_root", &format_args!("{}", self.target_head_root))?; Value::serialize( &self.processing_target, record, "current_target", serializer, )?; serializer.emit_usize("batches", self.batches.len())?; serializer.emit_usize("peers", self.peers.len())?; serializer.emit_u64("validated_batches", self.validated_batches)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } } use super::batch::WrongState as WrongBatchState; impl From for RemoveChain { fn from(err: WrongBatchState) -> Self { RemoveChain::WrongBatchState(err.0) } } impl RemoveChain { pub fn is_critical(&self) -> bool { matches!( self, RemoveChain::WrongBatchState(..) | RemoveChain::WrongChainState(..) ) } } impl From for SyncingChainType { fn from(value: RangeSyncType) -> Self { match value { RangeSyncType::Head => Self::Head, RangeSyncType::Finalized => Self::Finalized, } } }