From 113758a4f573ca8d9ae6d28b4c58fcf1592d5de7 Mon Sep 17 00:00:00 2001 From: divma Date: Mon, 5 Oct 2020 04:02:09 +0000 Subject: [PATCH] From panic to crit (#1726) ## Issue Addressed Downgrade inconsistent chain segment states from `panic` to `crit`. I don't love this solution but since range can always bounce back from any of those, we don't panic. Co-authored-by: Age Manning --- .../network/src/sync/range_sync/batch.rs | 81 +++++++++++++------ .../network/src/sync/range_sync/chain.rs | 54 ++++++------- 2 files changed, 85 insertions(+), 50 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index aa863576fc..b51b20897c 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,6 +1,7 @@ use crate::sync::RequestId; use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; +use slog::{crit, warn, Logger}; use ssz::Encode; use std::collections::HashSet; use std::hash::{Hash, Hasher}; @@ -125,13 +126,17 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: SignedBeaconBlock) { + pub fn add_block(&mut self, block: SignedBeaconBlock, logger: &Logger) { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); self.state = BatchState::Downloading(peer, blocks, req_id) } - other => unreachable!("Add block for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Add block for batch in wrong state"; "state" => ?other); + self.state = other + } } } @@ -140,14 +145,8 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, - ) -> Result< - usize, /* Received blocks */ - ( - Slot, /* expected slot */ - Slot, /* received slot */ - &BatchState, - ), - > { + logger: &Logger, + ) -> Result> { match self.state.poison() { BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range @@ -163,7 +162,7 @@ impl BatchInfo { None }; - if let Some(range) = failed_range { + if let Some((expected, received)) = failed_range { // this is a failed download, register the attempt and check if the batch // can be tried again self.failed_download_attempts.push(peer); @@ -175,7 +174,9 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - return Err((range.0, range.1, &self.state)); + warn!(logger, "Batch received out of range blocks"; + &self, "expected" => expected, "received" => received); + return Err(&self.state); } } @@ -183,12 +184,17 @@ impl BatchInfo { self.state = BatchState::AwaitingProcessing(peer, blocks); Ok(received) } - other => unreachable!("Download completed for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Download completed for batch in wrong state"; "state" => ?other); + self.state = other; + Err(&self.state) + } } } #[must_use = "Batch may have failed"] - pub fn download_failed(&mut self) -> &BatchState { + pub fn download_failed(&mut self, logger: &Logger) -> &BatchState { match self.state.poison() { BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again @@ -203,31 +209,50 @@ impl BatchInfo { }; &self.state } - other => unreachable!("Download failed for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Download failed for batch in wrong state"; "state" => ?other); + self.state = other; + &self.state + } } } - pub fn start_downloading_from_peer(&mut self, peer: PeerId, request_id: RequestId) { + pub fn start_downloading_from_peer( + &mut self, + peer: PeerId, + request_id: RequestId, + logger: &Logger, + ) { match self.state.poison() { BatchState::AwaitingDownload => { self.state = BatchState::Downloading(peer, Vec::new(), request_id); } - other => unreachable!("Starting download for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Starting download for batch in wrong state"; "state" => ?other); + self.state = other + } } } - pub fn start_processing(&mut self) -> Vec> { + pub fn start_processing(&mut self, logger: &Logger) -> Vec> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new(peer, &blocks)); blocks } - other => unreachable!("Start processing for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Starting procesing batch in wrong state"; "state" => ?other); + self.state = other; + vec![] + } } } #[must_use = "Batch may have failed"] - pub fn processing_completed(&mut self, was_sucessful: bool) -> &BatchState { + pub fn processing_completed(&mut self, was_sucessful: bool, logger: &Logger) -> &BatchState { match self.state.poison() { BatchState::Processing(attempt) => { self.state = if !was_sucessful { @@ -247,12 +272,17 @@ impl BatchInfo { }; &self.state } - other => unreachable!("Processing completed for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Procesing completed for batch in wrong state"; "state" => ?other); + self.state = other; + &self.state + } } } #[must_use = "Batch may have failed"] - pub fn validation_failed(&mut self) -> &BatchState { + pub fn validation_failed(&mut self, logger: &Logger) -> &BatchState { match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); @@ -267,7 +297,12 @@ impl BatchInfo { }; &self.state } - other => unreachable!("Validation failed for batch in wrong state: {:?}", other), + BatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + crit!(logger, "Validation failed for batch in wrong state"; "state" => ?other); + self.state = other; + &self.state + } } } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 755f9e0e00..8ed21616d5 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -30,7 +30,6 @@ const BATCH_BUFFER_SIZE: u8 = 5; #[derive(PartialEq)] #[must_use = "Should be checked, since a failed chain must be removed. A chain that requested being removed and continued is now in an inconsistent state"] - pub enum ProcessingResult { KeepChain, RemoveChain, @@ -168,7 +167,7 @@ impl SyncingChain { .batches .get_mut(&id) .expect("registered batch exists") - .download_failed() + .download_failed(&self.log) { return ProcessingResult::RemoveChain; } @@ -229,18 +228,17 @@ impl SyncingChain { if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - batch.add_block(block); + batch.add_block(block, &self.log); ProcessingResult::KeepChain } else { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - self.peers .get_mut(peer_id) .unwrap_or_else(|| panic!("Batch is registered for the peer")) .remove(&batch_id); - match batch.download_completed() { + match batch.download_completed(&self.log) { Ok(received) => { let awaiting_batches = batch_id.saturating_sub( self.optimistic_start @@ -254,9 +252,7 @@ impl SyncingChain { } self.process_completed_batches(network) } - Err((expected, received, state)) => { - warn!(self.log, "Batch received out of range blocks"; - "epoch" => batch_id, "expected" => expected, "received" => received); + Err(state) => { if let BatchState::Failed = state { return ProcessingResult::RemoveChain; } @@ -285,7 +281,7 @@ impl SyncingChain { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = batch.start_processing(); + let blocks = batch.start_processing(&self.log); let process_id = ProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -299,7 +295,6 @@ impl SyncingChain { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - // TODO: needs better handling self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) } else { ProcessingResult::KeepChain @@ -337,25 +332,29 @@ impl SyncingChain { BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed - | BatchState::Poisoned - | BatchState::AwaitingValidation(_) => { + | BatchState::Poisoned => { // these are all inconsistent states: - // - Processing -> `self.current_processing_batch` is Some - // - Failed -> non recoverable batch. For a optimistic batch, it should + // - Processing -> `self.current_processing_batch` is None + // - Failed -> non recoverable batch. For an optimistic batch, it should // have been removed // - Poisoned -> this is an intermediate state that should never be reached // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. - // - AwaitingValidation -> If an optimistic batch is successfully processed - // it is no longer considered an optimistic candidate. If the batch was - // empty the chain rejects it; if it was non empty the chain is advanced - // to this point (so that the old optimistic batch is now the processing - // target) unreachable!( "Optimistic batch indicates inconsistent chain state: {:?}", state ) } + BatchState::AwaitingValidation(_) => { + // This is possible due to race conditions, and tho it would be considered + // an inconsistent state, the chain can continue. If an optimistic batch + // is successfully processed it is no longer considered an optimistic + // candidate. If the batch was empty the chain rejects it; if it was non + // empty the chain is advanced to this point (so that the old optimistic + // batch is now the processing target) + crit!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch); + None + } } } else { None @@ -385,7 +384,7 @@ impl SyncingChain { // re-requested. // - AwaitingValidation -> self.processing_target should have been moved // forward - // - Processing -> `self.current_processing_batch` is Some + // - Processing -> `self.current_processing_batch` is None // - Poisoned -> Intermediate state that should never be reached unreachable!( "Robust target batch indicates inconsistent chain state: {:?}", @@ -441,7 +440,7 @@ impl SyncingChain { .batches .get_mut(&batch_id) .expect("Chain was expecting a known batch"); - let _ = batch.processing_completed(true); + let _ = batch.processing_completed(true, &self.log); // If the processed batch was not empty, we can validate previous unvalidated // blocks. if *was_non_empty { @@ -489,7 +488,7 @@ impl SyncingChain { .expect("batch is processing blocks from a peer"); debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - if let BatchState::Failed = batch.processing_completed(false) { + if let BatchState::Failed = batch.processing_completed(false, &self.log) { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches @@ -566,6 +565,7 @@ impl SyncingChain { // safety check for batch boundaries if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH { crit!(self.log, "Validating Epoch is not aligned"); + return; } // batches in the range [BatchId, ..) (not yet validated) @@ -690,7 +690,7 @@ impl SyncingChain { let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { - if let BatchState::Failed = batch.validation_failed() { + if let BatchState::Failed = batch.validation_failed(&self.log) { // remove the chain early return ProcessingResult::RemoveChain; } @@ -804,7 +804,7 @@ impl SyncingChain { .get_mut(peer_id) .expect("Peer belongs to the chain") .remove(&batch_id); - if let BatchState::Failed = batch.download_failed() { + if let BatchState::Failed = batch.download_failed(&self.log) { return ProcessingResult::RemoveChain; } self.retry_batch_download(network, batch_id) @@ -859,7 +859,7 @@ impl SyncingChain { match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone(), request_id); + batch.start_downloading_from_peer(peer.clone(), request_id, &self.log); if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -881,12 +881,12 @@ impl SyncingChain { warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer.clone(), 1); // fake request_id is not relevant + batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant self.peers .get_mut(&peer) .expect("peer belongs to the peer pool") .remove(&batch_id); - if let BatchState::Failed = batch.download_failed() { + if let BatchState::Failed = batch.download_failed(&self.log) { return ProcessingResult::RemoveChain; } else { return self.retry_batch_download(network, batch_id);