Sync fixes (#1716)

## Issue Addressed

chain state inconsistencies

## Proposed Changes
- a batch can be fake-failed by Range if it needs to move a peer to another chain. The peer will still send blocks/ errors / produce timeouts for those  requests, so check when we get a response from the RPC that the request id matches, instead of only the peer, since a re-request can be directed to the same peer.
- if an optimistic batch succeeds, store the attempt to avoid trying it again when quickly switching chains. Also, use it only if ahead of our current target, instead of the segment's start epoch
This commit is contained in:
divma
2020-10-04 23:49:14 +00:00
parent d72c026d32
commit 86a18e72c4
5 changed files with 81 additions and 70 deletions

View File

@@ -1,3 +1,4 @@
use crate::sync::RequestId;
use eth2_libp2p::rpc::methods::BlocksByRangeRequest;
use eth2_libp2p::PeerId;
use ssz::Encode;
@@ -32,7 +33,7 @@ pub enum BatchState<T: EthSpec> {
/// The batch has failed either downloading or processing, but can be requested again.
AwaitingDownload,
/// The batch is being downloaded.
Downloading(PeerId, Vec<SignedBeaconBlock<T>>),
Downloading(PeerId, Vec<SignedBeaconBlock<T>>, RequestId),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<SignedBeaconBlock<T>>),
/// The batch is being processed.
@@ -99,7 +100,7 @@ impl<T: EthSpec> BatchInfo<T> {
pub fn current_peer(&self) -> Option<&PeerId> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None,
BatchState::Downloading(peer_id, _)
BatchState::Downloading(peer_id, _, _)
| BatchState::AwaitingProcessing(peer_id, _)
| BatchState::Processing(Attempt { peer_id, .. })
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(&peer_id),
@@ -126,9 +127,9 @@ impl<T: EthSpec> BatchInfo<T> {
/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: SignedBeaconBlock<T>) {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks) => {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
self.state = BatchState::Downloading(peer, blocks)
self.state = BatchState::Downloading(peer, blocks, req_id)
}
other => unreachable!("Add block for batch in wrong state: {:?}", other),
}
@@ -148,7 +149,7 @@ impl<T: EthSpec> BatchInfo<T> {
),
> {
match self.state.poison() {
BatchState::Downloading(peer, blocks) => {
BatchState::Downloading(peer, blocks, _request_id) => {
// verify that blocks are in range
if let Some(last_slot) = blocks.last().map(|b| b.slot()) {
// the batch is non-empty
@@ -189,7 +190,7 @@ impl<T: EthSpec> BatchInfo<T> {
#[must_use = "Batch may have failed"]
pub fn download_failed(&mut self) -> &BatchState<T> {
match self.state.poison() {
BatchState::Downloading(peer, _) => {
BatchState::Downloading(peer, _, _request_id) => {
// register the attempt and check if the batch can be tried again
self.failed_download_attempts.push(peer);
self.state = if self.failed_download_attempts.len()
@@ -206,10 +207,10 @@ impl<T: EthSpec> BatchInfo<T> {
}
}
pub fn start_downloading_from_peer(&mut self, peer: PeerId) {
pub fn start_downloading_from_peer(&mut self, peer: PeerId, request_id: RequestId) {
match self.state.poison() {
BatchState::AwaitingDownload => {
self.state = BatchState::Downloading(peer, Vec::new());
self.state = BatchState::Downloading(peer, Vec::new(), request_id);
}
other => unreachable!("Starting download for batch in wrong state: {:?}", other),
}
@@ -333,9 +334,13 @@ impl<T: EthSpec> std::fmt::Debug for BatchState<T> {
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
}
BatchState::Downloading(peer, blocks) => {
write!(f, "Downloading({}, {} blocks)", peer, blocks.len())
}
BatchState::Downloading(peer, blocks, request_id) => write!(
f,
"Downloading({}, {} blocks, {})",
peer,
blocks.len(),
request_id
),
BatchState::Poisoned => f.write_str("Poisoned"),
}
}