Do not reset batch ids & redownload out of range batches (#1528)

The changes are somewhat simple but should solve two issues:
- When quickly changing between chains once and a second time back again, batchIds would collide and cause havoc. 
- If we got an out of range response from a peer, sync would remain in syncing but without advancing

Changes:
- remove the batch id. Identify each batch (inside a chain) by its starting epoch. Target epochs for downloading and processing now advance by EPOCHS_PER_BATCH
- for the same reason, move the "to_be_downloaded_id" to be an epoch
- remove a sneaky line that dropped an out of range batch without downloading it
- bonus: put the chain_id in the log given to the chain. This is why explicitly logging the chain_id is removed
This commit is contained in:
divma
2020-08-18 01:29:51 +00:00
parent 9a97a0b14f
commit 46dbf027af
9 changed files with 116 additions and 180 deletions

View File

@@ -1,4 +1,4 @@
use super::batch::{Batch, BatchId, PendingBatches};
use super::batch::{Batch, PendingBatches};
use crate::beacon_processor::ProcessId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::RequestId;
@@ -73,11 +73,12 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// and thus available to download this chain from.
pub peer_pool: HashSet<PeerId>,
/// The next batch_id that needs to be downloaded.
to_be_downloaded_id: BatchId,
/// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: Epoch,
/// The next batch id that needs to be processed.
to_be_processed_id: BatchId,
/// Starting epoch of the batch that needs to be processed next.
/// This is incremented as the chain advances.
processing_target: Epoch,
/// The current state of the chain.
pub state: ChainSyncingState,
@@ -91,7 +92,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A reference to the sync logger.
/// The chain's log.
log: slog::Logger,
}
@@ -127,8 +128,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
completed_batches: Vec::new(),
processed_batches: Vec::new(),
peer_pool,
to_be_downloaded_id: BatchId(1),
to_be_processed_id: BatchId(1),
to_be_downloaded: start_epoch,
processing_target: start_epoch,
state: ChainSyncingState::Stopped,
current_processing_batch: None,
beacon_processor_send,
@@ -139,13 +140,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Returns the latest slot number that has been processed.
fn current_processed_slot(&self) -> Slot {
self.start_epoch
// the last slot we processed was included in the previous batch, and corresponds to the
// first slot of the current target epoch
self.processing_target
.start_slot(T::EthSpec::slots_per_epoch())
.saturating_add(
self.to_be_processed_id.saturating_sub(1u64)
* T::EthSpec::slots_per_epoch()
* EPOCHS_PER_BATCH,
)
}
/// A batch of blocks has been received. This function gets run on all chains and should
@@ -182,21 +180,19 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// An entire batch of blocks has been received. This functions checks to see if it can be processed,
// remove any batches waiting to be verified and if this chain is syncing, request new
// blocks for the peer.
debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
debug!(self.log, "Completed batch received"; "epoch" => batch.start_epoch, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
// verify the range of received blocks
// Note that the order of blocks is verified in block processing
if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) {
// the batch is non-empty
let first_slot = batch.downloaded_blocks[0].slot();
if batch.start_slot > first_slot || batch.end_slot < last_slot {
if batch.start_slot() > first_slot || batch.end_slot() < last_slot {
warn!(self.log, "BlocksByRange response returned out of range blocks";
"response_initial_slot" => first_slot,
"requested_initial_slot" => batch.start_slot);
// This is a pretty bad error. We don't consider this fatal, but we don't tolerate
// this much either.
network.report_peer(batch.current_peer, PeerAction::LowToleranceError);
self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches
"response_initial_slot" => first_slot,
"requested_initial_slot" => batch.start_slot());
// this batch can't be used, so we need to request it again.
self.failed_batch(network, batch);
return;
}
}
@@ -242,7 +238,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Check if there is a batch ready to be processed
if !self.completed_batches.is_empty()
&& self.completed_batches[0].id == self.to_be_processed_id
&& self.completed_batches[0].start_epoch == self.processing_target
{
let batch = self.completed_batches.remove(0);
@@ -258,7 +254,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Sends a batch to the beacon processor for async processing in a queue.
fn process_batch(&mut self, mut batch: Batch<T::EthSpec>) {
let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new());
let process_id = ProcessId::RangeBatchId(self.id, batch.id);
let process_id = ProcessId::RangeBatchId(self.id, batch.start_epoch);
self.current_processing_batch = Some(batch);
if let Err(e) = self
@@ -280,7 +276,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
chain_id: ChainId,
batch_id: BatchId,
batch_start_epoch: Epoch,
downloaded_blocks: &mut Option<Vec<SignedBeaconBlock<T::EthSpec>>>,
result: &BatchProcessResult,
) -> Option<ProcessingResult> {
@@ -289,14 +285,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
match &self.current_processing_batch {
Some(current_batch) if current_batch.id != batch_id => {
Some(current_batch) if current_batch.start_epoch != batch_start_epoch => {
debug!(self.log, "Unexpected batch result";
"chain_id" => self.id, "batch_id" => *batch_id, "expected_batch_id" => *current_batch.id);
"batch_epoch" => batch_start_epoch, "expected_batch_epoch" => current_batch.start_epoch);
return None;
}
None => {
debug!(self.log, "Chain was not expecting a batch result";
"chain_id" => self.id, "batch_id" => *batch_id);
"batch_epoch" => batch_start_epoch);
return None;
}
_ => {
@@ -308,7 +304,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let downloaded_blocks = downloaded_blocks.take().or_else(|| {
// if taken by another chain, we are no longer waiting on a result.
self.current_processing_batch = None;
crit!(self.log, "Processed batch taken by another chain"; "chain_id" => self.id);
crit!(self.log, "Processed batch taken by another chain");
None
})?;
@@ -318,16 +314,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.downloaded_blocks = downloaded_blocks;
// double check batches are processed in order TODO: Remove for prod
if batch.id != self.to_be_processed_id {
if batch.start_epoch != self.processing_target {
crit!(self.log, "Batch processed out of order";
"chain_id" => self.id,
"processed_batch_id" => *batch.id,
"expected_id" => *self.to_be_processed_id);
"processed_starting_epoch" => batch.start_epoch,
"expected_epoch" => self.processing_target);
}
let res = match result {
BatchProcessResult::Success => {
*self.to_be_processed_id += 1;
self.processing_target += EPOCHS_PER_BATCH;
// If the processed batch was not empty, we can validate previous invalidated
// blocks including the current batch.
@@ -357,7 +352,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
BatchProcessResult::Partial => {
warn!(self.log, "Batch processing failed but at least one block was imported";
"chain_id" => self.id, "id" => *batch.id, "peer" => format!("{}", batch.current_peer)
"batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string()
);
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
@@ -375,7 +370,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
"batch_epoch"=> batch.start_epoch);
for peer_id in self.peer_pool.drain() {
network.report_peer(peer_id, action);
}
@@ -388,7 +383,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
BatchProcessResult::Failed => {
debug!(self.log, "Batch processing failed";
"chain_id" => self.id,"id" => *batch.id, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string());
"batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string());
// The batch processing failed
// This could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
@@ -403,7 +398,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
"batch_epoch" => batch.start_epoch);
for peer_id in self.peer_pool.drain() {
network.report_peer(peer_id, action);
}
@@ -433,11 +428,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) {
while !self.processed_batches.is_empty() {
let mut processed_batch = self.processed_batches.remove(0);
if *processed_batch.id >= *last_batch.id {
if processed_batch.start_epoch >= last_batch.start_epoch {
crit!(self.log, "A processed batch had a greater id than the current process id";
"chain_id" => self.id,
"processed_id" => *processed_batch.id,
"current_id" => *last_batch.id);
"processed_start_epoch" => processed_batch.start_epoch,
"current_start_epoch" => last_batch.start_epoch);
}
// Go through passed attempts and downscore peers that returned invalid batches
@@ -452,11 +446,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
debug!(
self.log, "Re-processed batch validated. Scoring original peer";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
"batch_epoch" => processed_batch.start_epoch,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(attempt.peer_id, action);
} else {
@@ -465,11 +458,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::MidToleranceError;
debug!(
self.log, "Re-processed batch validated by the same peer.";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
"batch_epoch" => processed_batch.start_epoch,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(attempt.peer_id, action);
}
@@ -508,7 +500,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Find any pre-processed batches awaiting validation
while !self.processed_batches.is_empty() {
let past_batch = self.processed_batches.remove(0);
*self.to_be_processed_id = std::cmp::min(*self.to_be_processed_id, *past_batch.id);
self.processing_target = std::cmp::min(self.processing_target, past_batch.start_epoch);
self.reprocess_batch(network, past_batch);
}
@@ -552,11 +544,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.current_peer = new_peer.clone();
debug!(self.log, "Re-requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer),
"batch_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string(),
"retries" => batch.retries,
"re-processes" => batch.reprocess_retries);
self.send_batch(network, batch);
@@ -592,12 +583,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.start_epoch = local_finalized_epoch;
debug!(self.log, "Updating chain's progress";
"chain_id" => self.id,
"prev_completed_slot" => current_processed_slot,
"new_completed_slot" => self.current_processed_slot());
// Re-index batches
*self.to_be_downloaded_id = 1;
*self.to_be_processed_id = 1;
self.to_be_downloaded = local_finalized_epoch;
self.processing_target = local_finalized_epoch;
// remove any completed or processed batches
self.completed_batches.clear();
@@ -621,7 +611,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// do not request blocks if the chain is not syncing
if let ChainSyncingState::Stopped = self.state {
debug!(self.log, "Peer added to a non-syncing chain";
"chain_id" => self.id, "peer_id" => format!("{}", peer_id));
"peer_id" => format!("{}", peer_id));
return;
}
@@ -650,8 +640,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> Option<ProcessingResult> {
if let Some(batch) = self.pending_batches.remove(request_id) {
debug!(self.log, "Batch failed. RPC Error";
"chain_id" => self.id,
"id" => *batch.id,
"batch_epoch" => batch.start_epoch,
"retries" => batch.retries,
"peer" => format!("{:?}", peer_id));
@@ -688,12 +677,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.current_peer = new_peer.clone();
debug!(self.log, "Re-Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{:?}", batch.current_peer));
"batch_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string());
self.send_batch(network, batch);
ProcessingResult::KeepChain
}
@@ -714,10 +701,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(peer_id) = self.get_next_peer() {
if let Some(batch) = self.get_next_batch(peer_id) {
debug!(self.log, "Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"batch_epoch" => batch.start_epoch,
"peer" => format!("{}", batch.current_peer));
// send the batch
self.send_batch(network, batch);
@@ -770,22 +756,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
// One is added to the start slot to begin one slot after the epoch boundary
let batch_start_slot = self
.start_epoch
.start_slot(slots_per_epoch)
.saturating_add(1u64)
+ self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch;
// don't request batches beyond the target head slot
if batch_start_slot > self.target_head_slot {
if self.to_be_downloaded.start_slot(slots_per_epoch) > self.target_head_slot {
return None;
}
// truncate the batch to the epoch containing the target head of the chain
let batch_end_slot = std::cmp::min(
// request either a batch containing the max number of blocks per batch
batch_start_slot + blocks_per_batch,
self.to_be_downloaded.start_slot(slots_per_epoch) + blocks_per_batch + 1,
// or a batch of one epoch of blocks, which contains the `target_head_slot`
self.target_head_slot
.saturating_add(slots_per_epoch)
@@ -793,28 +772,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.start_slot(slots_per_epoch),
);
let batch_id = self.to_be_downloaded_id;
// Find the next batch id. The largest of the next sequential id, or the next uncompleted
// id
let max_completed_id = self
.completed_batches
.iter()
.last()
.map(|x| x.id.0)
.unwrap_or_else(|| 0);
// TODO: Check if this is necessary
self.to_be_downloaded_id = BatchId(std::cmp::max(
self.to_be_downloaded_id.0 + 1,
max_completed_id + 1,
));
Some(Batch::new(
batch_id,
batch_start_slot,
batch_end_slot,
peer_id,
))
let batch = Some(Batch::new(self.to_be_downloaded, batch_end_slot, peer_id));
self.to_be_downloaded += EPOCHS_PER_BATCH;
batch
}
/// Requests the provided batch from the provided peer.
@@ -832,14 +792,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
Err(e) => {
warn!(self.log, "Batch request failed";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer),
"retries" => batch.retries,
"error" => e,
"re-processes" => batch.reprocess_retries);
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"start_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string(),
"retries" => batch.retries,
"error" => e,
"re-processes" => batch.reprocess_retries);
self.failed_batch(network, batch);
}
}