mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 19:51:47 +00:00
Documents and simplifies syncing code (#738)
* Add duplication prevention to gossipsub * Clean up topic logs * Add content addressed messages for gossip * Support BlocksByRange step parameter * Add initial docs to syncing * Adds documentation and simplify code in sync * Remove unnecessary comment * Fix fmt issue * Add batch log to sync
This commit is contained in:
@@ -23,6 +23,7 @@ const BLOCKS_PER_BATCH: u64 = 50;
|
||||
/// The number of times to retry a batch before the chain is considered failed and removed.
|
||||
const MAX_BATCH_RETRIES: u8 = 5;
|
||||
|
||||
/// A collection of sequential blocks that are requested from peers in a single RPC request.
|
||||
#[derive(PartialEq)]
|
||||
pub struct Batch<T: EthSpec> {
|
||||
/// The ID of the batch, batches are ID's sequentially.
|
||||
@@ -55,6 +56,9 @@ impl<T: EthSpec> PartialOrd for Batch<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
|
||||
/// has been completed and should be removed or to be kept if further processing is
|
||||
/// required.
|
||||
pub enum ProcessingResult {
|
||||
KeepChain,
|
||||
RemoveChain,
|
||||
@@ -86,6 +90,9 @@ impl<T: EthSpec> Batch<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
|
||||
/// root are grouped into the peer pool and queried for batches when downloading the
|
||||
/// chain.
|
||||
pub struct SyncingChain<T: BeaconChainTypes> {
|
||||
/// The original start slot when this chain was initialised.
|
||||
pub start_slot: Slot,
|
||||
@@ -127,7 +134,7 @@ pub enum ChainSyncingState {
|
||||
/// The chain is undergoing syncing.
|
||||
Syncing,
|
||||
/// The chain is temporarily paused whilst an error is rectified.
|
||||
Paused,
|
||||
_Paused,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
@@ -154,6 +161,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A batch of blocks has been received. This function gets run on all chains and should
|
||||
/// return Some if the request id matches a pending request on this chain, or None if it does
|
||||
/// not.
|
||||
///
|
||||
/// If the request corresponds to a pending batch, this function processes the completed
|
||||
/// batch.
|
||||
pub fn on_block_response(
|
||||
&mut self,
|
||||
chain: &Weak<BeaconChain<T>>,
|
||||
@@ -174,6 +187,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A completed batch has been received, process the batch.
|
||||
/// This will return `ProcessingResult::KeepChain` if the chain has not completed or
|
||||
/// failed indicating that further batches are required.
|
||||
fn process_completed_batch(
|
||||
&mut self,
|
||||
chain: Weak<BeaconChain<T>>,
|
||||
@@ -226,103 +242,109 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
//blocks here, manage the queue and process them in another thread as they become
|
||||
//available.
|
||||
|
||||
if self.state != ChainSyncingState::Paused {
|
||||
if self.state == ChainSyncingState::Syncing {
|
||||
// pre-emptively request more blocks from peers whilst we process current blocks,
|
||||
self.send_range_request(network, current_peer);
|
||||
self.send_range_request(network, current_peer, log);
|
||||
}
|
||||
|
||||
// Try and process batches sequentially in the ordered list.
|
||||
let current_process_id = self.to_be_processed_id;
|
||||
for batch in self
|
||||
.completed_batches
|
||||
.iter()
|
||||
.filter(|batch| batch.id >= current_process_id)
|
||||
{
|
||||
if batch.id == self.to_be_processed_id {
|
||||
if batch.downloaded_blocks.is_empty() {
|
||||
// the batch was empty, progress to the next block
|
||||
self.to_be_processed_id += 1;
|
||||
continue;
|
||||
} else {
|
||||
let mut successes = 0;
|
||||
debug!(log, "Processing batch"; "batch_id" => batch.id);
|
||||
match process_batch(chain.clone(), batch, &mut successes, log) {
|
||||
Ok(_) => {
|
||||
// batch was successfully processed
|
||||
self.last_processed_id = self.to_be_processed_id;
|
||||
self.to_be_processed_id += 1;
|
||||
// Try and process batches sequentially in the ordered list.
|
||||
let current_process_id = self.to_be_processed_id;
|
||||
// keep track of the number of successful batches to decide whether to run fork choice
|
||||
let mut successful_block_process = false;
|
||||
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"location" => "batch import success"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "batch import success"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(log, "Block processing error"; "error"=> format!("{:?}", e));
|
||||
for batch in self
|
||||
.completed_batches
|
||||
.iter()
|
||||
.filter(|batch| batch.id >= current_process_id)
|
||||
{
|
||||
if batch.id != self.to_be_processed_id {
|
||||
// there are no batches to be processed at the moment
|
||||
break;
|
||||
}
|
||||
|
||||
if successes > 0 {
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"block_imports" => successes,
|
||||
"location" => "batch import error"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "batch import error"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
if batch.downloaded_blocks.is_empty() {
|
||||
// the batch was empty, progress to the next block
|
||||
self.to_be_processed_id += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// batch processing failed
|
||||
// this could be because this batch is invalid, or a previous invalidated batch
|
||||
// is invalid. We need to find out which and downvote the peer that has sent us
|
||||
// an invalid batch.
|
||||
// process the batch
|
||||
// Keep track of successful batches. Run fork choice after all waiting batches have
|
||||
// been processed.
|
||||
debug!(log, "Processing batch"; "batch_id" => batch.id);
|
||||
match process_batch(chain.clone(), batch, log) {
|
||||
Ok(_) => {
|
||||
// batch was successfully processed
|
||||
self.last_processed_id = self.to_be_processed_id;
|
||||
self.to_be_processed_id += 1;
|
||||
successful_block_process = true;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(log, "Block processing error"; "error"=> format!("{:?}", e));
|
||||
|
||||
// firstly remove any validated batches
|
||||
return self.handle_invalid_batch(chain, network);
|
||||
if successful_block_process {
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"location" => "batch import error"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "batch import error"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// there are no more batches to be processed, end
|
||||
break;
|
||||
|
||||
// batch processing failed
|
||||
// this could be because this batch is invalid, or a previous invalidated batch
|
||||
// is invalid. We need to find out which and downvote the peer that has sent us
|
||||
// an invalid batch.
|
||||
|
||||
// firstly remove any validated batches
|
||||
return self.handle_invalid_batch(chain, network);
|
||||
}
|
||||
}
|
||||
// remove any validated batches
|
||||
let last_processed_id = self.last_processed_id;
|
||||
self.completed_batches
|
||||
.retain(|batch| batch.id >= last_processed_id);
|
||||
|
||||
// check if the chain has completed syncing, if not, request another batch from this peer
|
||||
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot
|
||||
{
|
||||
// chain is completed
|
||||
ProcessingResult::RemoveChain
|
||||
} else {
|
||||
// chain is not completed
|
||||
ProcessingResult::KeepChain
|
||||
}
|
||||
// If we have processed batches, run fork choice
|
||||
if successful_block_process {
|
||||
if let Some(chain) = chain.upgrade() {
|
||||
match chain.fork_choice() {
|
||||
Ok(()) => trace!(
|
||||
log,
|
||||
"Fork choice success";
|
||||
"location" => "batch import success"
|
||||
),
|
||||
Err(e) => error!(
|
||||
log,
|
||||
"Fork choice failed";
|
||||
"error" => format!("{:?}", e),
|
||||
"location" => "batch import success"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove any validated batches
|
||||
let last_processed_id = self.last_processed_id;
|
||||
self.completed_batches
|
||||
.retain(|batch| batch.id >= last_processed_id);
|
||||
|
||||
// check if the chain has completed syncing
|
||||
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot {
|
||||
// chain is completed
|
||||
ProcessingResult::RemoveChain
|
||||
} else {
|
||||
// chain is not completed
|
||||
ProcessingResult::KeepChain
|
||||
}
|
||||
}
|
||||
|
||||
/// An invalid batch has been received that could not be processed.
|
||||
fn handle_invalid_batch(
|
||||
&mut self,
|
||||
_chain: Weak<BeaconChain<T>>,
|
||||
@@ -340,9 +362,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
//
|
||||
|
||||
//TODO: Implement this logic
|
||||
// Currently just fail the chain, and drop all associated peers
|
||||
for peer_id in self.peer_pool.iter() {
|
||||
network.downvote_peer(peer_id.clone());
|
||||
// Currently just fail the chain, and drop all associated peers, removing them from the
|
||||
// peer pool, to prevent re-status
|
||||
for peer_id in self.peer_pool.drain() {
|
||||
network.downvote_peer(peer_id);
|
||||
}
|
||||
ProcessingResult::RemoveChain
|
||||
}
|
||||
@@ -352,6 +375,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
|
||||
// Either a new chain, or an old one with a peer list
|
||||
/// This chain has been requested to start syncing.
|
||||
///
|
||||
/// This could be new chain, or an old chain that is being resumed.
|
||||
pub fn start_syncing(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
@@ -415,14 +441,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
for peer_id in peers {
|
||||
// send a blocks by range request to the peer
|
||||
self.send_range_request(network, peer_id);
|
||||
self.send_range_request(network, peer_id, log);
|
||||
}
|
||||
|
||||
self.state = ChainSyncingState::Syncing;
|
||||
}
|
||||
|
||||
// A peer has been added, start batch requests for this peer
|
||||
// this should only be called for a syncing chain
|
||||
/// A peer has been added.
|
||||
///
|
||||
/// If the chain is active, this starts requesting batches from this peer.
|
||||
pub fn peer_added(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
@@ -436,24 +463,32 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
|
||||
// find the next batch and request it from the peer
|
||||
self.send_range_request(network, peer_id);
|
||||
self.send_range_request(network, peer_id, log);
|
||||
}
|
||||
|
||||
// Re-STATUS all the peers in this chain
|
||||
/// Sends a STATUS message to all peers in the peer pool.
|
||||
pub fn status_peers(&self, chain: Weak<BeaconChain<T>>, network: &mut SyncNetworkContext) {
|
||||
for peer_id in self.peer_pool.iter() {
|
||||
network.status_peer(chain.clone(), peer_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
fn send_range_request(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) {
|
||||
/// Requests the next required batch from the provided peer.
|
||||
fn send_range_request(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
peer_id: PeerId,
|
||||
log: &slog::Logger,
|
||||
) {
|
||||
// find the next pending batch and request it from the peer
|
||||
if let Some(batch) = self.get_next_batch(peer_id) {
|
||||
debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
|
||||
// send the batch
|
||||
self.send_batch(network, batch);
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the provided batch from the provided peer.
|
||||
fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
|
||||
let request = batch.to_blocks_by_range_request();
|
||||
if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request)
|
||||
@@ -463,6 +498,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the next required batch from the chain if it exists. If there are no more batches
|
||||
/// required, `None` is returned.
|
||||
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
|
||||
let batch_start_slot =
|
||||
self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH;
|
||||
@@ -493,9 +530,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
))
|
||||
}
|
||||
|
||||
// Checks if the request_id is associated with this chain. If so, attempts to re-request the
|
||||
// batch. If the batch has exceeded the number of retries, returns Some(true), indicating
|
||||
// the chain should be dropped.
|
||||
/// An RPC error has occurred.
|
||||
///
|
||||
/// Checks if the request_id is associated with this chain. If so, attempts to re-request the
|
||||
/// batch. If the batch has exceeded the number of retries, returns
|
||||
/// Some(`ProcessingResult::RemoveChain)`. Returns `None` if the request isn't related to
|
||||
/// this chain.
|
||||
pub fn inject_error(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
@@ -512,6 +552,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A batch has failed.
|
||||
///
|
||||
/// Attempts to re-request from another peer in the peer pool (if possible) and returns
|
||||
/// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds
|
||||
/// `MAX_BATCH_RETRIES`.
|
||||
pub fn failed_batch(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext,
|
||||
@@ -542,7 +587,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
fn process_batch<T: BeaconChainTypes>(
|
||||
chain: Weak<BeaconChain<T>>,
|
||||
batch: &Batch<T::EthSpec>,
|
||||
successes: &mut usize,
|
||||
log: &Logger,
|
||||
) -> Result<(), String> {
|
||||
for block in &batch.downloaded_blocks {
|
||||
@@ -558,8 +602,6 @@ fn process_batch<T: BeaconChainTypes>(
|
||||
"slot" => block.slot,
|
||||
"block_root" => format!("{}", block_root),
|
||||
);
|
||||
|
||||
*successes += 1
|
||||
}
|
||||
BlockProcessingOutcome::ParentUnknown { parent } => {
|
||||
// blocks should be sequential and all parents should exist
|
||||
|
||||
Reference in New Issue
Block a user