Initial sync re-write. WIP

This commit is contained in:
Age Manning
2019-08-12 22:07:59 +10:00
parent 66419d00ea
commit 5d4d2f35e1
4 changed files with 405 additions and 384 deletions

View File

@@ -0,0 +1,283 @@
const MAXIMUM_BLOCKS_PER_REQUEST: usize = 10;
const SIMULTANEOUS_REQUESTS: usize = 10;
use super::simple_sync::FUTURE_SLOT_TOLERANCE;
struct Chunk {
id: usize,
start_slot: Slot,
end_slot: Slot,
}
struct CompletedChunk {
peer_id: PeerId,
chunk: Chunk,
blocks: Vec<BeaconBlock>,
}
struct ProcessedChunk {
peer_id: PeerId,
chunk: Chunk,
}
#[derive(PartialEq)]
pub enum SyncState {
Idle,
Downloading,
ColdSync {
max_wanted_slot: Slot,
max_wanted_hash: Hash256,
}
}
pub enum SyncManagerState {
RequestBlocks(peer_id, BeaconBlockRequest),
Stalled,
Idle,
}
pub struct PeerSyncInfo {
peer_id: PeerId,
fork_version: [u8,4],
finalized_root: Hash256,
finalized_epoch: Epoch,
head_root: Hash256,
head_slot: Slot,
requested_slot_skip: Option<(Slot, usize)>,
}
pub(crate) struct SyncManager<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A mapping of Peers to their respective PeerSyncInfo.
available_peers: HashMap<PeerId, PeerSyncInfo>,
wanted_chunks: Vec<Chunk>,
pending_chunks: HashMap<PeerId,Chunk>,
completed_chunks: Vec<Chunk>,
processed_chunks: Vec<Chunk>, // ordered
multi_peer_sections: HashMap<PeerId, MultiPeerSection>
current_requests: usize,
latest_wanted_slot: Option<Slot, Hash256>,
sync_status: SyncStatus,
to_process_chunk_id: usize,
log: Logger,
}
impl<T: BeaconChainTypes> SyncManager<T> {
/// Adds a sync-able peer and determines which blocks to download given the current state of
/// the chain, known peers and currently requested blocks.
fn add_sync_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo, network &mut NetworkContext) {
let local = PeerSyncInfo::from(&self.chain);
let remote_finalized_slot = remote.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch());
let local_finalized_slot = local.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch());
// cold sync
if remote_finalized_slot > local.head_slot {
if let SyncState::Idle || SyncState::Downloading = self.sync_state {
info!(self.log, "Cold Sync Started", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot);
self.sync_state = SyncState::ColdSync{Slot::from(0), remote.finalized_hash}
}
if let SyncState::ColdSync{max_wanted_slot, max_wanted_hjash } = self.sync_state {
// We don't assume that our current head is the canonical chain. So we request blocks from
// our last finalized slot to ensure we are on the finalized chain.
if max_wanted_slot < remote_finalized_slot {
let remaining_blocks = remote_finalized_slot - max_wanted_slot;
for chunk in (0..remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) {
self.wanted_chunks.push(
Chunk {
id: self.current_chunk_id,
previous_chunk: self.curent_chunk_id.saturating_sub(1),
start_slot: chunk*MAXIMUM_BLOCKS_PER_REQUEST + self.last_wanted_slot,
end_slot: (section+1)*MAXIMUM_BLOCKS_PER_REQUEST +self.last_wanted_slot,
})
self.current_chunk_id +=1;
}
// add any extra partial chunks
self.pending_section.push( Section {
start_slot: (remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) + 1,
end_slot: remote_finalized_slot,
})
self.current_chunk_id +=1;
info!(self.log, "Cold Sync Updated", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot);
self.sync_state = SyncState::ColdSync{remote_finalized_slot, remote.finalized_hash}
}
}
else { // hot sync
if remote_head_slot > self.chain.head().beacon_state.slot {
if let SyncState::Idle = self.sync_state {
self.sync_state = SyncState::Downloading
info!(self.log, "Sync Started", "start_slot" => local.head_slot, "latest_known_head" => remote.head_slot.as_u64());
}
self.latest_known_slot = remote_head_slot;
//TODO Build requests.
}
}
available_peers.push(remote);
}
pub fn add_blocks(&mut self, chunk_id: RequestId, peer_id: PeerId, blocks: Vec<BeaconBlock>) {
if SyncState::ColdSync{max_wanted_slot, max_wanted_hash} = self.sync_state {
let chunk = match self.pending_chunks.remove(&peer_id) {
Some(chunks) => {
match chunks.find(|chunk| chunk.id == chunk_id) {
Some(chunk) => chunk,
None => {
warn!(self.log, "Received blocks for an unknown chunk";
"peer"=> peer_id);
return;
}
}
},
None => {
warn!(self.log, "Received blocks without a request";
"peer"=> peer_id);
return;
}
};
// add to completed
self.current_requests -= 1;
self.completed_chunks.push(CompletedChunk(peer_id, Chunk));
}
}
pub fn inject_error(id: RequestId, peer_id) {
if let SyncState::ColdSync{ _max_wanted_slot, _max_wanted_hash } {
match self.pending_chunks.get(&peer_id) {
Some(chunks) => {
if let Some(pos) = chunks.iter().position(|c| c.id == id) {
chunks.remove(pos);
}
},
None => {
debug!(self.log,
"Received an error for an unknown request";
"request_id" => id,
"peer" => peer_id
);
}
}
}
}
pub fn poll(&mut self) -> SyncManagerState {
// if cold sync
if let SyncState::ColdSync(waiting_slot, max_wanted_slot, max_wanted_hash) = self.sync_state {
// Try to process completed chunks
for completed_chunk in self.completed_chunks {
let chunk = completed_chunk.1;
let last_chunk_id = {
let no_processed_chunks = self.processed_chunks.len();
if elements == 0 { 0 } else { self.processed_chunks[no_processed_chunks].id }
};
if chunk.id == last_chunk_id + 1 {
// try and process the chunk
for block in chunk.blocks {
let processing_result = self.chain.process_block(block.clone());
if let Ok(outcome) = processing_result {
match outcome {
BlockProcessingOutCome::Processed { block_root} => {
// block successfully processed
},
BlockProcessingOutcome::BlockIsAlreadyKnown => {
warn!(
self.log, "Block Already Known";
"source" => source,
"sync" => "Cold Sync",
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
"peer" => format!("{:?}", chunk.0),
);
},
_ => {
// An error has occurred
// This could be due to the previous chunk or the current chunk.
// Re-issue both.
warn!(
self.log, "Faulty Chunk";
"source" => source,
"sync" => "Cold Sync",
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
"peer" => format!("{:?}", chunk.0),
"outcome" => format!("{:?}", outcome),
);
// re-issue both chunks
// if both are the same peer. Downgrade the peer.
let past_chunk = self.processed_chunks.pop()
self.wanted_chunks.insert(0, chunk.clone());
self.wanted_chunks.insert(0, past_chunk.clone());
if chunk.0 == past_chunk.peer_id {
// downgrade peer
return SyncManagerState::DowngradePeer(chunk.0);
}
break;
}
}
}
}
// chunk successfully processed
debug!(self.log,
"Chunk Processed";
"id" => chunk.id
"start_slot" => chunk.start_slot,
"end_slot" => chunk.end_slot,
);
self.processed_chunks.push(chunk);
}
}
// chunks completed, update the state
self.sync_state = SyncState::ColdSync{waiting_slot, max_wanted_slot, max_wanted_hash};
// Remove stales
// Spawn requests
if self.current_requests <= SIMULTANEOUS_REQUESTS {
if !self.wanted_chunks.is_empty() {
let chunk = self.wanted_chunks.remove(0);
for n in (0..self.peers.len()).rev() {
let peer = self.peers.swap_remove(n);
let peer_finalized_slot = peer.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch());
if peer_finalized_slot >= chunk.end_slot {
*self.pending.chunks.entry(&peer_id).or_insert_with(|| Vec::new).push(chunk);
self.active_peers.push(peer);
self.current_requests +=1;
let block_request = BeaconBlockRequest {
head_block_root,
start_slot: chunk.start_slot,
count: chunk.end_slot - chunk.start_slot
step: 1
}
return SyncManagerState::BlockRequest(peer, block_request);
}
}
// no peers for this chunk
self.wanted_chunks.push(chunk);
return SyncManagerState::Stalled
}
}
}
// if hot sync
return SyncManagerState::Idle
}

View File

@@ -17,7 +17,7 @@ use types::{
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// The amount of seconds a block (or partial block) may exist in the import queue.
/// The amount of seconds a block may exist in the import queue.
const QUEUE_STALE_SECS: u64 = 100;
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
@@ -30,23 +30,23 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
network_id: u8,
chain_id: u64,
latest_finalized_root: Hash256,
latest_finalized_epoch: Epoch,
best_root: Hash256,
best_slot: Slot,
fork_version: [u8,4],
finalized_root: Hash256,
finalized_epoch: Epoch,
head_root: Hash256,
head_slot: Slot,
requested_slot_skip: Option<(Slot, usize)>,
}
impl From<HelloMessage> for PeerSyncInfo {
fn from(hello: HelloMessage) -> PeerSyncInfo {
PeerSyncInfo {
network_id: hello.network_id,
chain_id: hello.chain_id,
latest_finalized_root: hello.latest_finalized_root,
latest_finalized_epoch: hello.latest_finalized_epoch,
best_root: hello.best_root,
best_slot: hello.best_slot,
fork_version: hello.fork_version,
finalized_root: hello.finalized_root,
finalized_epoch: hello.finalized_epoch,
head_root: hello.head_root,
head_slot: hello.head_slot,
requested_slot_skip: None,
}
}
}
@@ -71,8 +71,6 @@ pub struct SimpleSync<T: BeaconChainTypes> {
chain: Arc<BeaconChain<T>>,
/// A mapping of Peers to their respective PeerSyncInfo.
known_peers: HashMap<PeerId, PeerSyncInfo>,
/// A queue to allow importing of blocks
import_queue: ImportQueue<T>,
/// The current state of the syncing protocol.
state: SyncState,
log: slog::Logger,
@@ -178,8 +176,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
if local.network_id != remote.network_id {
// The node is on a different network, disconnect them.
if local.fork_version != remote.fork_version {
// The node is on a different network/fork, disconnect them.
info!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
@@ -187,9 +185,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
);
network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.latest_finalized_epoch <= local.latest_finalized_epoch
&& remote.latest_finalized_root != Hash256::zero()
&& local.latest_finalized_root != Hash256::zero()
} else if remote.finalized_epoch <= local.finalized_epoch
&& remote.finalized_root != Hash256::zero()
&& local.finalized_root != Hash256::zero()
&& (self.root_at_slot(start_slot(remote.latest_finalized_epoch))
!= Some(remote.latest_finalized_root))
{
@@ -248,22 +246,37 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"remote_latest_finalized_epoch" => remote.latest_finalized_epoch,
);
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.as_u64(),
},
network,
);
self.process_sync();
}
}
self.proess_sync(&mut self) {
loop {
match self.sync_manager.poll() {
SyncManagerState::RequestBlocks(peer_id, req) {
debug!(
self.log,
"RPCRequest(BeaconBlockBodies)";
"count" => req.block_roots.len(),
"peer" => format!("{:?}", peer_id)
);
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(req));
},
SyncManagerState::Stalled {
// need more peers to continue sync
warn!(self.log, "No useable peers for sync");
break;
},
SyncManagerState::Idle {
// nothing to do
break;
}
}
}
}
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
self.chain
.rev_iter_block_roots(target_slot)
@@ -272,213 +285,27 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.map(|(root, _slot)| root)
}
/// Handle a `BeaconBlockRoots` request from the peer.
pub fn on_beacon_block_roots_request(
/// Handle a `BeaconBlocks` request from the peer.
pub fn on_beacon_blocks_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockRootsRequest,
req: BeaconBlocksRequest,
network: &mut NetworkContext,
) {
let state = &self.chain.head().beacon_state;
debug!(
self.log,
"BlockRootsRequest";
"BeaconBlocksRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.count,
"start_slot" => req.start_slot,
);
let mut roots: Vec<BlockRootSlot> = self
.chain
.rev_iter_block_roots(std::cmp::min(req.start_slot + req.count, state.slot))
.take_while(|(_root, slot)| req.start_slot <= *slot)
.map(|(block_root, slot)| BlockRootSlot { slot, block_root })
.collect();
if roots.len() as u64 != req.count {
debug!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes",
"start_slot" => req.start_slot,
"current_slot" => self.chain.present_slot(),
"requested" => req.count,
"returned" => roots.len(),
);
}
roots.reverse();
roots.dedup_by_key(|brs| brs.block_root);
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }),
)
}
/// Handle a `BeaconBlockRoots` response from the peer.
pub fn on_beacon_block_roots_response(
&mut self,
peer_id: PeerId,
res: BeaconBlockRootsResponse,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockRootsResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.roots.len(),
);
if res.roots.is_empty() {
warn!(
self.log,
"Peer returned empty block roots response";
"peer_id" => format!("{:?}", peer_id)
);
return;
}
// The wire protocol specifies that slots must be in ascending order.
if !res.slots_are_ascending() {
warn!(
self.log,
"Peer returned block roots response with bad slot ordering";
"peer_id" => format!("{:?}", peer_id)
);
return;
}
let new_roots = self
.import_queue
.enqueue_block_roots(&res.roots, peer_id.clone());
// No new roots means nothing to do.
//
// This check protects against future panics.
if new_roots.is_empty() {
return;
}
// Determine the first (earliest) and last (latest) `BlockRootSlot` items.
//
// This logic relies upon slots to be in ascending order, which is enforced earlier.
let first = new_roots.first().expect("Non-empty list must have first");
let last = new_roots.last().expect("Non-empty list must have last");
// Request all headers between the earliest and latest new `BlockRootSlot` items.
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: first.block_root,
start_slot: first.slot,
max_headers: (last.slot - first.slot + 1).as_u64(),
skip_slots: 0,
},
network,
)
}
/// Handle a `BeaconBlockHeaders` request from the peer.
pub fn on_beacon_block_headers_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext,
) {
let state = &self.chain.head().beacon_state;
debug!(
self.log,
"BlockHeadersRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.max_headers,
);
let count = req.max_headers;
// Collect the block roots.
let mut roots: Vec<Hash256> = self
.chain
.rev_iter_block_roots(std::cmp::min(req.start_slot + count, state.slot))
.take_while(|(_root, slot)| req.start_slot <= *slot)
.map(|(root, _slot)| root)
.collect();
roots.reverse();
roots.dedup();
let headers: Vec<BeaconBlockHeader> = roots
.into_iter()
.step_by(req.skip_slots as usize + 1)
.filter_map(|root| {
let block = self
.chain
.store
.get::<BeaconBlock<T::EthSpec>>(&root)
.ok()?;
Some(block?.block_header())
})
.collect();
// ssz-encode the headers
let headers = headers.as_ssz_bytes();
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }),
)
}
/// Handle a `BeaconBlockHeaders` response from the peer.
pub fn on_beacon_block_headers_response(
&mut self,
peer_id: PeerId,
headers: Vec<BeaconBlockHeader>,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockHeadersResponse";
"peer" => format!("{:?}", peer_id),
"count" => headers.len(),
);
if headers.is_empty() {
warn!(
self.log,
"Peer returned empty block headers response. PeerId: {:?}", peer_id
);
return;
}
// Enqueue the headers, obtaining a list of the roots of the headers which were newly added
// to the queue.
let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone());
if !block_roots.is_empty() {
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
}
}
/// Handle a `BeaconBlockBodies` request from the peer.
pub fn on_beacon_block_bodies_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext,
) {
let block_bodies: Vec<BeaconBlockBody<_>> = req
.block_roots
.iter()
.filter_map(|root| {
let blocks = Vec<BeaconBlock<T::EthSpec>> = self
.chain.rev_iter_block_roots().filter(|(_root, slot) req.start_slot <= slot && req.start_slot + req.count >= slot).take_while(|(_root, slot) req.start_slot <= *slot)
.filter_map(|root, slot| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(root) {
Some(block.body)
} else {
@@ -494,59 +321,49 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
})
.collect();
debug!(
self.log,
"BlockBodiesRequest";
"peer" => format!("{:?}", peer_id),
"requested" => req.block_roots.len(),
"returned" => block_bodies.len(),
);
roots.reverse();
roots.dedup_by_key(|brs| brs.block_root);
let bytes = block_bodies.as_ssz_bytes();
if roots.len() as u64 != req.count {
debug!(
self.log,
"BeaconBlocksRequest";
"peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes",
"start_slot" => req.start_slot,
"current_slot" => self.chain.present_slot(),
"requested" => req.count,
"returned" => roots.len(),
);
}
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse {
block_bodies: bytes,
block_roots: None,
}),
RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()),
)
}
/// Handle a `BeaconBlockBodies` response from the peer.
pub fn on_beacon_block_bodies_response(
/// Handle a `BeaconBlocks` response from the peer.
pub fn on_beacon_blocks_response(
&mut self,
peer_id: PeerId,
res: DecodedBeaconBlockBodiesResponse<T::EthSpec>,
res: Vec<BeaconBlock<T::EthSpec>>,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockBodiesResponse";
"BeaconBlocksResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.block_bodies.len(),
);
if !res.block_bodies.is_empty() {
// Import all blocks to queue
let last_root = self
.import_queue
.enqueue_bodies(res.block_bodies, peer_id.clone());
// Attempt to process all received bodies by recursively processing the latest block
if let Some(root) = last_root {
if let Some(BlockProcessingOutcome::Processed { .. }) =
self.attempt_process_partial_block(peer_id, root, network, &"rpc")
{
// If processing is successful remove from `import_queue`
self.import_queue.remove(root);
}
}
if !res.is_empty() {
self.sync_manager.add_blocks(peer_id, blocks);
}
// Clear out old entries
self.import_queue.remove_stale();
self.process_sync();
}
/// Process a gossip message declaring a new block.
@@ -679,22 +496,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req));
}
/// Request some `BeaconBlockBodies` from the remote peer.
fn request_block_bodies(
&mut self,
peer_id: PeerId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"RPCRequest(BeaconBlockBodies)";
"count" => req.block_roots.len(),
"peer" => format!("{:?}", peer_id)
);
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
}
/// Returns `true` if `self.chain` has not yet processed this block.
pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool {