Sync update (#1412)

## Issue Addressed

Recurring sync loop and invalid batch downloading

## Proposed Changes

Shifts the batches to include the first slot of each epoch. This ensures the finalized is always downloaded once a chain has completed syncing. 

Also add in logic to prevent re-dialing disconnected peers. Non-performant peers get disconnected during sync, this prevents re-connection to these during sync. 

## Additional Info

N/A
This commit is contained in:
Age Manning
2020-07-29 05:25:10 +00:00
parent f53dedb27d
commit 395d99ce03
5 changed files with 84 additions and 45 deletions

View File

@@ -39,23 +39,33 @@ pub struct Batch<T: EthSpec> {
pub id: BatchId,
/// The requested start slot of the batch, inclusive.
pub start_slot: Slot,
/// The requested end slot of batch, exclusive.
/// The requested end slot of batch, exlcusive.
pub end_slot: Slot,
/// The peer that was originally assigned to the batch.
pub original_peer: PeerId,
/// The `Attempts` that have been made to send us this batch.
pub attempts: Vec<Attempt>,
/// The peer that is currently assigned to the batch.
pub current_peer: PeerId,
/// The number of retries this batch has undergone due to a failed request.
/// This occurs when peers do not respond or we get an RPC error.
pub retries: u8,
/// The number of times this batch has attempted to be re-downloaded and re-processed. This
/// occurs when a batch has been received but cannot be processed.
pub reprocess_retries: u8,
/// Marks the batch as undergoing a re-process, with a hash of the original blocks it received.
pub original_hash: Option<u64>,
/// The blocks that have been downloaded.
pub downloaded_blocks: Vec<SignedBeaconBlock<T>>,
}
/// Represents a peer's attempt and providing the result for this batch.
///
/// Invalid attempts will downscore a peer.
#[derive(PartialEq, Debug)]
pub struct Attempt {
/// The peer that made the attempt.
pub peer_id: PeerId,
/// The hash of the blocks of the attempt.
pub hash: u64,
}
impl<T: EthSpec> Eq for Batch<T> {}
impl<T: EthSpec> Batch<T> {
@@ -64,11 +74,10 @@ impl<T: EthSpec> Batch<T> {
id,
start_slot,
end_slot,
original_peer: peer_id.clone(),
attempts: Vec::new(),
current_peer: peer_id,
retries: 0,
reprocess_retries: 0,
original_hash: None,
downloaded_blocks: Vec::new(),
}
}

View File

@@ -30,10 +30,10 @@ const BATCH_BUFFER_SIZE: u8 = 5;
/// be reported negatively.
const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3;
#[derive(PartialEq)]
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
/// has been completed and should be removed or to be kept if further processing is
/// required.
#[derive(PartialEq)]
pub enum ProcessingResult {
KeepChain,
RemoveChain,
@@ -325,18 +325,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
*self.to_be_processed_id += 1;
// If the processed batch was not empty, we can validate previous invalidated
// blocks
// blocks including the current batch.
if !batch.downloaded_blocks.is_empty() {
self.mark_processed_batches_as_valid(network, &batch);
}
// Add the current batch to processed batches to be verified in the future. We are
// only uncertain about this batch, if it has not returned all blocks.
if batch.downloaded_blocks.last().map(|block| block.slot())
!= Some(batch.end_slot.saturating_sub(1u64))
{
self.processed_batches.push(batch);
}
// Add the current batch to processed batches to be verified in the future.
self.processed_batches.push(batch);
// check if the chain has completed syncing
if self.current_processed_slot() >= self.target_head_slot {
@@ -432,7 +427,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
last_batch: &Batch<T::EthSpec>,
) {
while !self.processed_batches.is_empty() {
let processed_batch = self.processed_batches.remove(0);
let mut processed_batch = self.processed_batches.remove(0);
if *processed_batch.id >= *last_batch.id {
crit!(self.log, "A processed batch had a greater id than the current process id";
"chain_id" => self.id,
@@ -440,12 +435,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"current_id" => *last_batch.id);
}
if let Some(prev_hash) = processed_batch.original_hash {
// Go through passed attempts and downscore peers that returned invalid batches
while !processed_batch.attempts.is_empty() {
let attempt = processed_batch.attempts.remove(0);
// The validated batch has been re-processed
if prev_hash != processed_batch.hash() {
if attempt.hash != processed_batch.hash() {
// The re-downloaded version was different
if processed_batch.current_peer != processed_batch.original_peer {
// A new peer sent the correct batch, the previous peer did not
if processed_batch.current_peer != attempt.peer_id {
// A different peer sent the correct batch, the previous peer did not
// We negatively score the original peer.
let action = PeerAction::LowToleranceError;
debug!(
@@ -453,10 +450,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",processed_batch.original_peer),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(processed_batch.original_peer, action);
network.report_peer(attempt.peer_id, action);
} else {
// The same peer corrected it's previous mistake. There was an error, so we
// negative score the original peer.
@@ -466,10 +463,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",processed_batch.original_peer),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(processed_batch.original_peer, action);
network.report_peer(attempt.peer_id, action);
}
}
}
@@ -524,7 +521,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
mut batch: Batch<T::EthSpec>,
) {
// marks the batch as attempting to be reprocessed by hashing the downloaded blocks
batch.original_hash = Some(batch.hash());
let attempt = super::batch::Attempt {
peer_id: batch.current_peer.clone(),
hash: batch.hash(),
};
// add this attempt to the batch
batch.attempts.push(attempt);
// remove previously downloaded blocks
batch.downloaded_blocks.clear();
@@ -546,7 +549,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(self.log, "Re-requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer),
"retries" => batch.retries,
@@ -682,7 +685,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(self.log, "Re-Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{:?}", batch.current_peer));
self.send_batch(network, batch);
@@ -707,7 +711,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(self.log, "Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer));
// send the batch
@@ -737,6 +741,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Returns the next required batch from the chain if it exists. If there are no more batches
/// required, `None` is returned.
///
/// Batches are downloaded excluding the first block of the epoch assuming it has already been
/// downloaded.
///
/// For example:
///
///
/// Epoch boundary | |
/// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 |
/// Batch 1 | Batch 2 | Batch 3
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH;
@@ -751,7 +765,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
let batch_start_slot = self.start_epoch.start_slot(slots_per_epoch)
// One is added to the start slot to begin one slot after the epoch boundary
let batch_start_slot = self
.start_epoch
.start_slot(slots_per_epoch)
.saturating_add(1u64)
+ self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch;
// don't request batches beyond the target head slot