mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-10 04:01:51 +00:00
Account manager, bootnodes, RPC display and sync fixes
This commit is contained in:
@@ -68,7 +68,7 @@ use types::{BeaconBlock, EthSpec, Hash256, Slot};
|
||||
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
|
||||
/// is requested. Currently the value is small for testing. This will be incremented for
|
||||
/// production.
|
||||
const MAX_BLOCKS_PER_REQUEST: u64 = 10;
|
||||
const MAX_BLOCKS_PER_REQUEST: u64 = 100;
|
||||
|
||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
|
||||
@@ -120,6 +120,8 @@ struct BlockRequests<T: EthSpec> {
|
||||
target_head_root: Hash256,
|
||||
/// The blocks that we have currently downloaded from the peer that are yet to be processed.
|
||||
downloaded_blocks: Vec<BeaconBlock<T>>,
|
||||
/// The number of blocks successfully processed in this request.
|
||||
blocks_processed: usize,
|
||||
/// The number of empty batches we have consecutively received. If a peer returns more than
|
||||
/// EMPTY_BATCHES_TOLERANCE, they are dropped.
|
||||
consecutive_empty_batches: usize,
|
||||
@@ -302,6 +304,7 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
target_head_root: remote.head_root,
|
||||
consecutive_empty_batches: 0,
|
||||
downloaded_blocks: Vec::new(),
|
||||
blocks_processed: 0,
|
||||
state: BlockRequestsState::Queued,
|
||||
sync_direction: SyncDirection::Initial,
|
||||
current_start_slot: chain.best_slot(),
|
||||
@@ -356,6 +359,10 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
warn!(self.log, "Peer returned too many empty block batches";
|
||||
"peer" => format!("{:?}", peer_id));
|
||||
block_requests.state = BlockRequestsState::Failed;
|
||||
} else if block_requests.current_start_slot >= block_requests.target_head_slot {
|
||||
warn!(self.log, "Peer did not return blocks it claimed to possess";
|
||||
"peer" => format!("{:?}", peer_id));
|
||||
block_requests.state = BlockRequestsState::Failed;
|
||||
} else {
|
||||
block_requests.update_start_slot();
|
||||
}
|
||||
@@ -561,19 +568,19 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
// only process batch requests if there are any
|
||||
if !self.import_queue.is_empty() {
|
||||
// process potential block requests
|
||||
self.process_potential_block_requests();
|
||||
re_run = re_run || self.process_potential_block_requests();
|
||||
|
||||
// process any complete long-range batches
|
||||
re_run = self.process_complete_batches();
|
||||
re_run = re_run || self.process_complete_batches();
|
||||
}
|
||||
|
||||
// only process parent objects if we are in regular sync
|
||||
if let ManagerState::Regular = self.state {
|
||||
if !self.parent_queue.is_empty() {
|
||||
// process any parent block lookup-requests
|
||||
self.process_parent_requests();
|
||||
re_run = re_run || self.process_parent_requests();
|
||||
|
||||
// process any complete parent lookups
|
||||
re_run = self.process_complete_parent_requests();
|
||||
re_run = re_run || self.process_complete_parent_requests();
|
||||
}
|
||||
|
||||
// return any queued events
|
||||
@@ -613,20 +620,23 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_potential_block_requests(&mut self) {
|
||||
fn process_potential_block_requests(&mut self) -> bool {
|
||||
// check if an outbound request is required
|
||||
// Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p
|
||||
// layer and not needed here. Therefore we create many outbound requests and let the RPC
|
||||
// handle the number of simultaneous requests. Request all queued objects.
|
||||
|
||||
let mut re_run = false;
|
||||
// remove any failed batches
|
||||
let debug_log = &self.log;
|
||||
let full_peer_ref = &mut self.full_peers;
|
||||
self.import_queue.retain(|peer_id, block_request| {
|
||||
if let BlockRequestsState::Failed = block_request.state {
|
||||
debug!(debug_log, "Block import from peer failed";
|
||||
"peer_id" => format!("{:?}", peer_id),
|
||||
"downloaded_blocks" => block_request.downloaded_blocks.len()
|
||||
"downloaded_blocks" => block_request.blocks_processed
|
||||
);
|
||||
full_peer_ref.remove(peer_id);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -654,7 +664,10 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
request,
|
||||
request_id,
|
||||
});
|
||||
re_run = true;
|
||||
}
|
||||
|
||||
re_run
|
||||
}
|
||||
|
||||
fn process_complete_batches(&mut self) -> bool {
|
||||
@@ -667,66 +680,75 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
let event_queue_ref = &mut self.event_queue;
|
||||
|
||||
self.import_queue.retain(|peer_id, block_requests| {
|
||||
// check that the chain still exists
|
||||
if let Some(chain) = chain_ref.upgrade() {
|
||||
let downloaded_blocks =
|
||||
std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new());
|
||||
let last_element = block_requests.downloaded_blocks.len() - 1;
|
||||
let start_slot = block_requests.downloaded_blocks[0].slot;
|
||||
let end_slot = block_requests.downloaded_blocks[last_element].slot;
|
||||
if block_requests.state == BlockRequestsState::ReadyToProcess {
|
||||
// check that the chain still exists
|
||||
if let Some(chain) = chain_ref.upgrade() {
|
||||
let downloaded_blocks =
|
||||
std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new());
|
||||
let last_element = downloaded_blocks.len() - 1;
|
||||
let start_slot = downloaded_blocks[0].slot;
|
||||
let end_slot = downloaded_blocks[last_element].slot;
|
||||
|
||||
match process_blocks(chain, downloaded_blocks, log_ref) {
|
||||
Ok(()) => {
|
||||
debug!(log_ref, "Blocks processed successfully";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"start_slot" => start_slot,
|
||||
"end_slot" => end_slot,
|
||||
"no_blocks" => last_element + 1,
|
||||
);
|
||||
|
||||
// check if the batch is complete, by verifying if we have reached the
|
||||
// target head
|
||||
if end_slot >= block_requests.target_head_slot {
|
||||
// Completed, re-hello the peer to ensure we are up to the latest head
|
||||
event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone()));
|
||||
// remove the request
|
||||
false
|
||||
} else {
|
||||
// have not reached the end, queue another batch
|
||||
block_requests.update_start_slot();
|
||||
re_run = true;
|
||||
// keep the batch
|
||||
true
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(log_ref, "Block processing failed";
|
||||
match process_blocks(chain, downloaded_blocks, log_ref) {
|
||||
Ok(()) => {
|
||||
debug!(log_ref, "Blocks processed successfully";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"start_slot" => start_slot,
|
||||
"end_slot" => end_slot,
|
||||
"no_blocks" => last_element + 1,
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone()));
|
||||
false
|
||||
);
|
||||
block_requests.blocks_processed += last_element + 1;
|
||||
|
||||
// check if the batch is complete, by verifying if we have reached the
|
||||
// target head
|
||||
if end_slot >= block_requests.target_head_slot {
|
||||
// Completed, re-hello the peer to ensure we are up to the latest head
|
||||
event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone()));
|
||||
// remove the request
|
||||
false
|
||||
} else {
|
||||
// have not reached the end, queue another batch
|
||||
block_requests.update_start_slot();
|
||||
re_run = true;
|
||||
// keep the batch
|
||||
true
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(log_ref, "Block processing failed";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"start_slot" => start_slot,
|
||||
"end_slot" => end_slot,
|
||||
"no_blocks" => last_element + 1,
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
event_queue_ref
|
||||
.push(ImportManagerOutcome::DownvotePeer(peer_id.clone()));
|
||||
false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// chain no longer exists, empty the queue and return
|
||||
event_queue_ref.clear();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// chain no longer exists, empty the queue and return
|
||||
event_queue_ref.clear();
|
||||
return false;
|
||||
// not ready to process
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
re_run
|
||||
}
|
||||
|
||||
fn process_parent_requests(&mut self) {
|
||||
fn process_parent_requests(&mut self) -> bool {
|
||||
// check to make sure there are peers to search for the parent from
|
||||
if self.full_peers.is_empty() {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut re_run = false;
|
||||
|
||||
// remove any failed requests
|
||||
let debug_log = &self.log;
|
||||
self.parent_queue.retain(|parent_request| {
|
||||
@@ -766,8 +788,10 @@ impl<T: BeaconChainTypes> ImportManager<T> {
|
||||
|
||||
self.event_queue
|
||||
.push(ImportManagerOutcome::RecentRequest(peer_id.clone(), req));
|
||||
re_run = true;
|
||||
}
|
||||
}
|
||||
re_run
|
||||
}
|
||||
|
||||
fn process_complete_parent_requests(&mut self) -> bool {
|
||||
|
||||
@@ -453,7 +453,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
}
|
||||
BlockProcessingOutcome::ParentUnknown { parent: _ } => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
trace!(self.log, "Unknown parent gossip";
|
||||
trace!(self.log, "Block with unknown parent received";
|
||||
"peer_id" => format!("{:?}",peer_id));
|
||||
self.manager.add_unknown_block(block.clone(), peer_id);
|
||||
SHOULD_FORWARD_GOSSIP_BLOCK
|
||||
|
||||
Reference in New Issue
Block a user