mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 11:41:51 +00:00
Allow sync to to request block bodies.
This commit is contained in:
@@ -3,10 +3,12 @@ use crate::message_handler::NetworkContext;
|
||||
use eth2_libp2p::rpc::methods::*;
|
||||
use eth2_libp2p::rpc::{RPCRequest, RPCResponse};
|
||||
use eth2_libp2p::PeerId;
|
||||
use slog::{debug, o};
|
||||
use slog::{debug, error, o, warn};
|
||||
use ssz::TreeHash;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use types::{Epoch, Hash256, Slot};
|
||||
use std::time::Instant;
|
||||
use types::{BeaconBlockHeader, Epoch, Hash256, Slot};
|
||||
|
||||
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
|
||||
const SLOT_IMPORT_TOLERANCE: u64 = 100;
|
||||
@@ -78,7 +80,7 @@ impl From<&Arc<BeaconChain>> for PeerSyncInfo {
|
||||
pub enum SyncState {
|
||||
Idle,
|
||||
Downloading,
|
||||
Stopped,
|
||||
_Stopped,
|
||||
}
|
||||
|
||||
/// Simple Syncing protocol.
|
||||
@@ -89,6 +91,8 @@ pub struct SimpleSync {
|
||||
chain: Arc<BeaconChain>,
|
||||
/// A mapping of Peers to their respective PeerSyncInfo.
|
||||
known_peers: HashMap<PeerId, PeerSyncInfo>,
|
||||
/// A queue to allow importing of blocks
|
||||
import_queue: ImportQueue,
|
||||
/// The current state of the syncing protocol.
|
||||
state: SyncState,
|
||||
/// Sync logger.
|
||||
@@ -97,11 +101,12 @@ pub struct SimpleSync {
|
||||
|
||||
impl SimpleSync {
|
||||
pub fn new(beacon_chain: Arc<BeaconChain>, log: &slog::Logger) -> Self {
|
||||
let state = beacon_chain.get_state();
|
||||
let sync_logger = log.new(o!("Service"=> "Sync"));
|
||||
let import_queue = ImportQueue::new(beacon_chain.clone(), log.clone());
|
||||
SimpleSync {
|
||||
chain: beacon_chain.clone(),
|
||||
known_peers: HashMap::new(),
|
||||
import_queue,
|
||||
state: SyncState::Idle,
|
||||
log: sync_logger,
|
||||
}
|
||||
@@ -149,15 +154,24 @@ impl SimpleSync {
|
||||
.start_slot(spec.slots_per_epoch);
|
||||
let required_slots = start_slot - local.best_slot;
|
||||
|
||||
self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network);
|
||||
self.request_block_roots(
|
||||
peer_id,
|
||||
BeaconBlockRootsRequest {
|
||||
start_slot,
|
||||
count: required_slots.into(),
|
||||
},
|
||||
network,
|
||||
);
|
||||
}
|
||||
PeerStatus::HigherBestSlot => {
|
||||
let required_slots = remote.best_slot - local.best_slot;
|
||||
|
||||
self.request_block_roots(
|
||||
peer_id,
|
||||
local.best_slot,
|
||||
required_slots.as_u64(),
|
||||
BeaconBlockRootsRequest {
|
||||
start_slot: local.best_slot + 1,
|
||||
count: required_slots.into(),
|
||||
},
|
||||
network,
|
||||
);
|
||||
}
|
||||
@@ -168,32 +182,109 @@ impl SimpleSync {
|
||||
pub fn on_beacon_block_roots_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
reponse: BeaconBlockRootsResponse,
|
||||
response: BeaconBlockRootsResponse,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
if response.roots.is_empty() {
|
||||
warn!(
|
||||
self.log,
|
||||
"Peer returned empty block roots response. PeerId: {:?}", peer_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let new_root_index = self.import_queue.first_new_root(&response.roots);
|
||||
|
||||
// If a new block root is found, request it and all the headers following it.
|
||||
//
|
||||
// We make an assumption here that if we don't know a block then we don't know of all
|
||||
// it's parents. This might not be the case if syncing becomes more sophisticated.
|
||||
if let Some(i) = new_root_index {
|
||||
let new = &response.roots[i];
|
||||
|
||||
self.request_block_headers(
|
||||
peer_id,
|
||||
BeaconBlockHeadersRequest {
|
||||
start_root: new.block_root,
|
||||
start_slot: new.slot,
|
||||
max_headers: (response.roots.len() - i) as u64,
|
||||
skip_slots: 0,
|
||||
},
|
||||
network,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_beacon_block_headers_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
response: BeaconBlockHeadersResponse,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
if response.headers.is_empty() {
|
||||
warn!(
|
||||
self.log,
|
||||
"Peer returned empty block headers response. PeerId: {:?}", peer_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let block_roots = self.import_queue.enqueue_headers(response.headers);
|
||||
|
||||
if !block_roots.is_empty() {
|
||||
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
|
||||
}
|
||||
}
|
||||
|
||||
fn request_block_roots(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
start_slot: Slot,
|
||||
count: u64,
|
||||
request: BeaconBlockRootsRequest,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
// Potentially set state to sync.
|
||||
if self.state == SyncState::Idle && count > SLOT_IMPORT_TOLERANCE {
|
||||
if self.state == SyncState::Idle && request.count > SLOT_IMPORT_TOLERANCE {
|
||||
debug!(self.log, "Entering downloading sync state.");
|
||||
self.state = SyncState::Downloading;
|
||||
}
|
||||
|
||||
debug!(self.log, "Requesting {} blocks from {:?}.", count, &peer_id);
|
||||
debug!(
|
||||
self.log,
|
||||
"Requesting {} block roots from {:?}.", request.count, &peer_id
|
||||
);
|
||||
|
||||
// TODO: handle count > max count.
|
||||
network.send_rpc_request(
|
||||
peer_id.clone(),
|
||||
RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest { start_slot, count }),
|
||||
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(request));
|
||||
}
|
||||
|
||||
fn request_block_headers(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: BeaconBlockHeadersRequest,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Requesting {} headers from {:?}.", request.max_headers, &peer_id
|
||||
);
|
||||
|
||||
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(request));
|
||||
}
|
||||
|
||||
fn request_block_bodies(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: BeaconBlockBodiesRequest,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Requesting {} bodies from {:?}.",
|
||||
request.block_roots.len(),
|
||||
&peer_id
|
||||
);
|
||||
|
||||
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(request));
|
||||
}
|
||||
|
||||
/// Generates our current state in the form of a HELLO RPC message.
|
||||
@@ -201,3 +292,82 @@ impl SimpleSync {
|
||||
self.chain.hello_message()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ImportQueue {
|
||||
/// BeaconChain
|
||||
pub chain: Arc<BeaconChain>,
|
||||
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
|
||||
pub partials: HashMap<Hash256, PartialBeaconBlock>,
|
||||
/// Logging
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl ImportQueue {
|
||||
pub fn new(chain: Arc<BeaconChain>, log: slog::Logger) -> Self {
|
||||
Self {
|
||||
chain,
|
||||
partials: HashMap::new(),
|
||||
log,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_new_block(&self, block_root: &Hash256) -> bool {
|
||||
self.chain
|
||||
.is_new_block_root(&block_root)
|
||||
.unwrap_or_else(|_| {
|
||||
error!(self.log, "Unable to determine if block is new.");
|
||||
true
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the index of the first new root in the list of block roots.
|
||||
pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option<usize> {
|
||||
for root in roots {
|
||||
println!("root {}", root.block_root);
|
||||
}
|
||||
roots
|
||||
.iter()
|
||||
.position(|brs| self.is_new_block(&brs.block_root))
|
||||
}
|
||||
|
||||
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
|
||||
/// which we should use to request `BeaconBlockBodies`.
|
||||
///
|
||||
/// If a `header` is not in the queue and has not been processed by the chain it is added to
|
||||
/// the queue and it's block root is included in the output.
|
||||
///
|
||||
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
|
||||
/// included in the output and the `inserted` time for the partial record is set to
|
||||
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
|
||||
pub fn enqueue_headers(&mut self, headers: Vec<BeaconBlockHeader>) -> Vec<Hash256> {
|
||||
let mut required_bodies: Vec<Hash256> = vec![];
|
||||
|
||||
for header in headers {
|
||||
let block_root = Hash256::from_slice(&header.hash_tree_root()[..]);
|
||||
|
||||
if self.is_new_block(&block_root) {
|
||||
self.insert_partial(block_root, header);
|
||||
required_bodies.push(block_root)
|
||||
}
|
||||
}
|
||||
|
||||
required_bodies
|
||||
}
|
||||
|
||||
fn insert_partial(&mut self, block_root: Hash256, header: BeaconBlockHeader) {
|
||||
self.partials.insert(
|
||||
header.block_body_root,
|
||||
PartialBeaconBlock {
|
||||
block_root,
|
||||
header,
|
||||
inserted: Instant::now(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PartialBeaconBlock {
|
||||
pub block_root: Hash256,
|
||||
pub header: BeaconBlockHeader,
|
||||
pub inserted: Instant,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user