mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 04:37:13 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states
This commit is contained in:
@@ -11,7 +11,6 @@ matches = "0.1.8"
|
||||
exit-future = "0.2.0"
|
||||
slog-term = "2.6.0"
|
||||
slog-async = "2.5.0"
|
||||
logging = { path = "../../common/logging" }
|
||||
environment = { path = "../../lighthouse/environment" }
|
||||
|
||||
[dependencies]
|
||||
@@ -35,6 +34,7 @@ fnv = "1.0.7"
|
||||
rlp = "0.5.0"
|
||||
lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
|
||||
logging = { path = "../../common/logging" }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
igd = "0.11.1"
|
||||
itertools = "0.10.0"
|
||||
|
||||
@@ -38,25 +38,27 @@
|
||||
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
|
||||
//! task.
|
||||
|
||||
use crate::sync::manager::BlockProcessType;
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
use beacon_chain::parking_lot::Mutex;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use futures::task::Poll;
|
||||
use lighthouse_network::{
|
||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
|
||||
};
|
||||
use logging::TimeLatch;
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::task::Context;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
use std::{cmp, collections::HashSet};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::sync::mpsc;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
||||
@@ -73,7 +75,7 @@ mod work_reprocessing_queue;
|
||||
mod worker;
|
||||
|
||||
use crate::beacon_processor::work_reprocessing_queue::QueuedBlock;
|
||||
pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId};
|
||||
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
|
||||
|
||||
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
||||
///
|
||||
@@ -159,9 +161,6 @@ const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
|
||||
/// The name of the worker tokio tasks.
|
||||
const WORKER_TASK_NAME: &str = "beacon_processor_worker";
|
||||
|
||||
/// The minimum interval between log messages indicating that a queue is full.
|
||||
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
|
||||
/// batch.
|
||||
///
|
||||
@@ -198,10 +197,6 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||
|
||||
/// Used to send/receive results from a rpc block import in a blocking task.
|
||||
pub type BlockResultSender<E> = oneshot::Sender<Result<Hash256, BlockError<E>>>;
|
||||
pub type BlockResultReceiver<E> = oneshot::Receiver<Result<Hash256, BlockError<E>>>;
|
||||
|
||||
/// A simple first-in-first-out queue with a maximum length.
|
||||
struct FifoQueue<T> {
|
||||
queue: VecDeque<T>,
|
||||
@@ -498,18 +493,22 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
/// sent to the other side of `result_tx`.
|
||||
pub fn rpc_beacon_block(
|
||||
block: Box<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> (Self, BlockResultReceiver<T::EthSpec>) {
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
let event = Self {
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::RpcBlock { block, result_tx },
|
||||
};
|
||||
(event, result_rx)
|
||||
work: Work::RpcBlock {
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new work event to import `blocks` as a beacon chain segment.
|
||||
pub fn chain_segment(
|
||||
process_id: ProcessId,
|
||||
process_id: ChainSegmentProcessId,
|
||||
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -694,10 +693,11 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
},
|
||||
RpcBlock {
|
||||
block: Box<SignedBeaconBlock<T::EthSpec>>,
|
||||
result_tx: BlockResultSender<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
},
|
||||
ChainSegment {
|
||||
process_id: ProcessId,
|
||||
process_id: ChainSegmentProcessId,
|
||||
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||
},
|
||||
Status {
|
||||
@@ -742,25 +742,6 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides de-bounce functionality for logging.
|
||||
#[derive(Default)]
|
||||
struct TimeLatch(Option<Instant>);
|
||||
|
||||
impl TimeLatch {
|
||||
/// Only returns true once every `LOG_DEBOUNCE_INTERVAL`.
|
||||
fn elapsed(&mut self) -> bool {
|
||||
let now = Instant::now();
|
||||
|
||||
let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time);
|
||||
|
||||
if is_elapsed || self.0.is_none() {
|
||||
self.0 = Some(now + LOG_DEBOUNCE_INTERVAL);
|
||||
}
|
||||
|
||||
is_elapsed
|
||||
}
|
||||
}
|
||||
|
||||
/// Unifies all the messages processed by the `BeaconProcessor`.
|
||||
enum InboundEvent<T: BeaconChainTypes> {
|
||||
/// A worker has completed a task and is free.
|
||||
@@ -1509,10 +1490,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
/*
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock { block, result_tx } => {
|
||||
Work::RpcBlock {
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
} => {
|
||||
worker.process_rpc_block(
|
||||
*block,
|
||||
result_tx,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
work_reprocessing_tx.clone(),
|
||||
duplicate_cache,
|
||||
);
|
||||
|
||||
@@ -240,7 +240,13 @@ impl TestRig {
|
||||
}
|
||||
|
||||
pub fn enqueue_rpc_block(&self) {
|
||||
let (event, _rx) = WorkEvent::rpc_beacon_block(Box::new(self.next_block.clone()));
|
||||
let event = WorkEvent::rpc_beacon_block(
|
||||
Box::new(self.next_block.clone()),
|
||||
std::time::Duration::default(),
|
||||
BlockProcessType::ParentLookup {
|
||||
chain_hash: Hash256::random(),
|
||||
},
|
||||
);
|
||||
self.beacon_processor_tx.try_send(event).unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use fnv::FnvHashMap;
|
||||
use futures::task::Poll;
|
||||
use futures::{Stream, StreamExt};
|
||||
use lighthouse_network::{MessageId, PeerId};
|
||||
use logging::TimeLatch;
|
||||
use slog::{crit, debug, error, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@@ -133,6 +134,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
||||
/* Aux */
|
||||
/// Next attestation id, used for both aggregated and unaggregated attestations
|
||||
next_attestation: usize,
|
||||
early_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -223,6 +226,8 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
|
||||
queued_unaggregates: FnvHashMap::default(),
|
||||
awaiting_attestations_per_root: HashMap::new(),
|
||||
next_attestation: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
};
|
||||
|
||||
executor.spawn(
|
||||
@@ -261,12 +266,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
|
||||
// Check to ensure this won't over-fill the queue.
|
||||
if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
|
||||
warn!(
|
||||
log,
|
||||
"Early blocks queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
if self.early_block_debounce.elapsed() {
|
||||
warn!(
|
||||
log,
|
||||
"Early blocks queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
}
|
||||
// Drop the block.
|
||||
return;
|
||||
}
|
||||
@@ -306,12 +313,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
}
|
||||
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => {
|
||||
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
|
||||
error!(
|
||||
log,
|
||||
"Aggregate attestation delay queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
if self.attestation_delay_debounce.elapsed() {
|
||||
error!(
|
||||
log,
|
||||
"Aggregate attestation delay queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
}
|
||||
// Drop the attestation.
|
||||
return;
|
||||
}
|
||||
@@ -337,12 +346,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
}
|
||||
InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => {
|
||||
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
|
||||
error!(
|
||||
log,
|
||||
"Attestation delay queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
if self.attestation_delay_debounce.elapsed() {
|
||||
error!(
|
||||
log,
|
||||
"Attestation delay queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
}
|
||||
// Drop the attestation.
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -772,6 +772,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
// TODO(merge): reconsider peer scoring for this event.
|
||||
Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(_)))
|
||||
| Err(e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::UnverifiedNonOptimisticCandidate))
|
||||
| Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::NoExecutionConnection)) => {
|
||||
debug!(self.log, "Could not verify block for gossip, ignoring the block";
|
||||
"error" => %e);
|
||||
@@ -1556,7 +1557,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
/*
|
||||
* The block indicated by the target root is not known to us.
|
||||
*
|
||||
* We should always get `AttnError::UnknwonHeadBlock` before we get this
|
||||
* We should always get `AttnError::UnknownHeadBlock` before we get this
|
||||
* error, so this means we can get this error if:
|
||||
*
|
||||
* 1. The target root does not represent a valid block.
|
||||
@@ -1565,7 +1566,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
* For (2), we should only be processing attestations when we should have
|
||||
* all the available information. Note: if we do a weak-subjectivity sync
|
||||
* it's possible that this situation could occur, but I think it's
|
||||
* unlikely. For now, we will declare this to be an invalid message>
|
||||
* unlikely. For now, we will declare this to be an invalid message.
|
||||
*
|
||||
* The peer has published an invalid consensus message.
|
||||
*/
|
||||
@@ -1712,14 +1713,12 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
AttnError::HeadBlockFinalized { beacon_block_root } => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Rejected attestation to finalized block";
|
||||
"Ignored attestation to finalized block";
|
||||
"block_root" => ?beacon_block_root,
|
||||
"attestation_slot" => failed_att.attestation().data.slot,
|
||||
);
|
||||
|
||||
// We have to reject the message as it isn't a descendant of the finalized
|
||||
// checkpoint.
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
|
||||
// The peer that sent us this could be a lagger, or a spammer, or this failure could
|
||||
// be due to us processing attestations extremely slowly. Don't be too harsh.
|
||||
|
||||
@@ -10,7 +10,7 @@ mod rpc_methods;
|
||||
mod sync_methods;
|
||||
|
||||
pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage};
|
||||
pub use sync_methods::ProcessId;
|
||||
pub use sync_methods::ChainSegmentProcessId;
|
||||
|
||||
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
|
||||
|
||||
|
||||
@@ -1,26 +1,28 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
|
||||
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
|
||||
use crate::beacon_processor::{BlockResultSender, DuplicateCache};
|
||||
use crate::beacon_processor::DuplicateCache;
|
||||
use crate::metrics;
|
||||
use crate::sync::manager::{SyncMessage, SyncRequestType};
|
||||
use crate::sync::manager::{BlockProcessType, SyncMessage};
|
||||
use crate::sync::{BatchProcessResult, ChainId};
|
||||
use beacon_chain::{
|
||||
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
|
||||
};
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
use slog::{crit, debug, error, info, trace, warn};
|
||||
use lighthouse_network::PeerAction;
|
||||
use slog::{debug, error, info, trace, warn};
|
||||
use tokio::sync::mpsc;
|
||||
use types::{Epoch, Hash256, SignedBeaconBlock};
|
||||
|
||||
/// Id associated to a block processing request, either a batch or a single block.
|
||||
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ProcessId {
|
||||
pub enum ChainSegmentProcessId {
|
||||
/// Processing Id of a range syncing batch.
|
||||
RangeBatchId(ChainId, Epoch),
|
||||
/// Processing ID for a backfill syncing batch.
|
||||
BackSyncBatchId(Epoch),
|
||||
/// Processing Id of the parent lookup of a block.
|
||||
ParentLookup(PeerId, Hash256),
|
||||
ParentLookup(Hash256),
|
||||
}
|
||||
|
||||
/// Returned when a chain segment import fails.
|
||||
@@ -32,88 +34,77 @@ struct ChainSegmentFailed {
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Worker<T> {
|
||||
/// Attempt to process a block received from a direct RPC request, returning the processing
|
||||
/// result on the `result_tx` channel.
|
||||
///
|
||||
/// Raises a log if there are errors publishing the result to the channel.
|
||||
/// Attempt to process a block received from a direct RPC request.
|
||||
pub fn process_rpc_block(
|
||||
self,
|
||||
block: SignedBeaconBlock<T::EthSpec>,
|
||||
result_tx: BlockResultSender<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
|
||||
duplicate_cache: DuplicateCache,
|
||||
) {
|
||||
let block_root = block.canonical_root();
|
||||
// Checks if the block is already being imported through another source
|
||||
if let Some(handle) = duplicate_cache.check_and_insert(block_root) {
|
||||
let slot = block.slot();
|
||||
let block_result = self.chain.process_block(block);
|
||||
// Check if the block is already being imported through another source
|
||||
let handle = match duplicate_cache.check_and_insert(block.canonical_root()) {
|
||||
Some(handle) => handle,
|
||||
None => {
|
||||
// Sync handles these results
|
||||
self.send_sync_message(SyncMessage::BlockProcessed {
|
||||
process_type,
|
||||
result: Err(BlockError::BlockIsAlreadyKnown),
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
let slot = block.slot();
|
||||
let result = self.chain.process_block(block);
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
if let Ok(root) = &block_result {
|
||||
info!(
|
||||
self.log,
|
||||
"New RPC block received";
|
||||
"slot" => slot,
|
||||
"hash" => %root
|
||||
// RPC block imported, regardless of process type
|
||||
if let &Ok(hash) = &result {
|
||||
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
|
||||
|
||||
// Trigger processing for work referencing this block.
|
||||
let reprocess_msg = ReprocessQueueMessage::BlockImported(hash);
|
||||
if reprocess_tx.try_send(reprocess_msg).is_err() {
|
||||
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
|
||||
};
|
||||
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
|
||||
self.chain.block_times_cache.write().set_time_observed(
|
||||
hash,
|
||||
slot,
|
||||
seen_timestamp,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
if reprocess_tx
|
||||
.try_send(ReprocessQueueMessage::BlockImported(*root))
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to inform block import";
|
||||
"source" => "rpc",
|
||||
"block_root" => %root,
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
if result_tx.send(block_result).is_err() {
|
||||
crit!(self.log, "Failed return sync block result");
|
||||
}
|
||||
// Drop the handle to remove the entry from the cache
|
||||
drop(handle);
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"Gossip block is being imported";
|
||||
"block_root" => %block_root,
|
||||
);
|
||||
// The gossip block that is being imported should eventually
|
||||
// trigger reprocessing of queued attestations once it is imported.
|
||||
// If the gossip block fails import, then it will be downscored
|
||||
// appropriately in `process_gossip_block`.
|
||||
|
||||
// Here, we assume that the block will eventually be imported and
|
||||
// send a `BlockIsAlreadyKnown` message to sync.
|
||||
if result_tx
|
||||
.send(Err(BlockError::BlockIsAlreadyKnown))
|
||||
.is_err()
|
||||
{
|
||||
crit!(self.log, "Failed return sync block result");
|
||||
self.run_fork_choice()
|
||||
}
|
||||
}
|
||||
// Sync handles these results
|
||||
self.send_sync_message(SyncMessage::BlockProcessed {
|
||||
process_type,
|
||||
result: result.map(|_| ()),
|
||||
});
|
||||
|
||||
// Drop the handle to remove the entry from the cache
|
||||
drop(handle);
|
||||
}
|
||||
|
||||
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
|
||||
/// thread if more blocks are needed to process it.
|
||||
pub fn process_chain_segment(
|
||||
&self,
|
||||
process_id: ProcessId,
|
||||
sync_type: ChainSegmentProcessId,
|
||||
downloaded_blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
match process_id {
|
||||
let result = match sync_type {
|
||||
// this a request from the range sync
|
||||
ProcessId::RangeBatchId(chain_id, epoch) => {
|
||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
|
||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||
let sent_blocks = downloaded_blocks.len();
|
||||
|
||||
let result = match self.process_blocks(downloaded_blocks.iter()) {
|
||||
match self.process_blocks(downloaded_blocks.iter()) {
|
||||
(_, Ok(_)) => {
|
||||
debug!(self.log, "Batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
@@ -139,19 +130,15 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
peer_action: e.peer_action,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let sync_type = SyncRequestType::RangeSync(epoch, chain_id);
|
||||
|
||||
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
|
||||
}
|
||||
}
|
||||
// this a request from the Backfill sync
|
||||
ProcessId::BackSyncBatchId(epoch) => {
|
||||
ChainSegmentProcessId::BackSyncBatchId(epoch) => {
|
||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||
let sent_blocks = downloaded_blocks.len();
|
||||
|
||||
let result = match self.process_backfill_blocks(&downloaded_blocks) {
|
||||
match self.process_backfill_blocks(&downloaded_blocks) {
|
||||
(_, Ok(_)) => {
|
||||
debug!(self.log, "Backfill batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
@@ -173,35 +160,34 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
peer_action: e.peer_action,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let sync_type = SyncRequestType::BackFillSync(epoch);
|
||||
|
||||
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
|
||||
}
|
||||
}
|
||||
// this is a parent lookup request from the sync manager
|
||||
ProcessId::ParentLookup(peer_id, chain_head) => {
|
||||
ChainSegmentProcessId::ParentLookup(chain_head) => {
|
||||
debug!(
|
||||
self.log, "Processing parent lookup";
|
||||
"last_peer_id" => %peer_id,
|
||||
"chain_hash" => %chain_head,
|
||||
"blocks" => downloaded_blocks.len()
|
||||
);
|
||||
// parent blocks are ordered from highest slot to lowest, so we need to process in
|
||||
// reverse
|
||||
match self.process_blocks(downloaded_blocks.iter().rev()) {
|
||||
(_, Err(e)) => {
|
||||
debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => %e.message);
|
||||
self.send_sync_message(SyncMessage::ParentLookupFailed {
|
||||
peer_id,
|
||||
chain_head,
|
||||
})
|
||||
(imported_blocks, Err(e)) => {
|
||||
debug!(self.log, "Parent lookup failed"; "error" => %e.message);
|
||||
BatchProcessResult::Failed {
|
||||
imported_blocks: imported_blocks > 0,
|
||||
peer_action: e.peer_action,
|
||||
}
|
||||
}
|
||||
(_, Ok(_)) => {
|
||||
(imported_blocks, Ok(_)) => {
|
||||
debug!(self.log, "Parent lookup processed successfully");
|
||||
BatchProcessResult::Success(imported_blocks > 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
|
||||
}
|
||||
|
||||
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
|
||||
|
||||
@@ -286,6 +286,14 @@ lazy_static! {
|
||||
"Number of Syncing chains in range, per range type",
|
||||
&["range_type"]
|
||||
);
|
||||
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
|
||||
"sync_single_block_lookups",
|
||||
"Number of single block lookups underway"
|
||||
);
|
||||
pub static ref SYNC_PARENT_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
|
||||
"sync_parent_block_lookups",
|
||||
"Number of parent block lookups underway"
|
||||
);
|
||||
|
||||
/*
|
||||
* Block Delay Metrics
|
||||
|
||||
@@ -8,12 +8,11 @@
|
||||
mod processor;
|
||||
|
||||
use crate::error;
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::{
|
||||
rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request,
|
||||
Response,
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
|
||||
};
|
||||
use processor::Processor;
|
||||
use slog::{debug, o, trace};
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::beacon_processor::{
|
||||
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
|
||||
};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use lighthouse_network::rpc::*;
|
||||
@@ -100,8 +101,11 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
/// this function notifies the sync manager of the error.
|
||||
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) {
|
||||
// Check if the failed RPC belongs to sync
|
||||
if let RequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RPCError(peer_id, id));
|
||||
if let RequestId::Sync(request_id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,24 +180,28 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Box<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
|
||||
unreachable!("Block lookups do not request BBRange requests")
|
||||
}
|
||||
id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id,
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
if let RequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::BlocksByRangeResponse {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
beacon_block,
|
||||
});
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"All blocks by range responses should belong to sync"
|
||||
);
|
||||
}
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` response from the peer.
|
||||
@@ -203,25 +211,27 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Box<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
|
||||
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
|
||||
unreachable!("Batch syncing do not request BBRoot requests")
|
||||
}
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
if let RequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::BlocksByRootResponse {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"All Blocks by Root responses should belong to sync"
|
||||
)
|
||||
}
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Process a gossip message declaring a new block.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::sync::manager::RequestId as SyncId;
|
||||
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
|
||||
use crate::router::{Router, RouterMessage};
|
||||
use crate::subnet_service::SyncCommitteeService;
|
||||
@@ -14,7 +15,7 @@ use lighthouse_network::{
|
||||
prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
|
||||
};
|
||||
use lighthouse_network::{
|
||||
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
|
||||
rpc::{GoodbyeReason, RPCResponseErrorCode},
|
||||
Context, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request,
|
||||
Response, Subnet,
|
||||
};
|
||||
@@ -42,6 +43,13 @@ const SUBSCRIBE_DELAY_SLOTS: u64 = 2;
|
||||
/// Delay after a fork where we unsubscribe from pre-fork topics.
|
||||
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
|
||||
|
||||
/// Application level requests sent to the network.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum RequestId {
|
||||
Sync(SyncId),
|
||||
Router,
|
||||
}
|
||||
|
||||
/// Types of messages that the network service can receive.
|
||||
#[derive(Debug)]
|
||||
pub enum NetworkMessage<T: EthSpec> {
|
||||
@@ -112,7 +120,7 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
||||
/// A reference to the underlying beacon chain.
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
/// The underlying libp2p service that drives all the network interactions.
|
||||
libp2p: LibP2PService<T::EthSpec>,
|
||||
libp2p: LibP2PService<RequestId, T::EthSpec>,
|
||||
/// An attestation and subnet manager service.
|
||||
attestation_service: AttestationService<T>,
|
||||
/// A sync committeee subnet manager service.
|
||||
@@ -389,7 +397,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
/// Handle an event received from the network.
|
||||
async fn on_libp2p_event(
|
||||
&mut self,
|
||||
ev: Libp2pEvent<T::EthSpec>,
|
||||
ev: Libp2pEvent<RequestId, T::EthSpec>,
|
||||
shutdown_sender: &mut Sender<ShutdownReason>,
|
||||
) {
|
||||
match ev {
|
||||
|
||||
@@ -59,6 +59,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let mut config = NetworkConfig::default();
|
||||
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
|
||||
config.libp2p_port = 21212;
|
||||
config.upnp_enabled = false;
|
||||
config.discovery_port = 21212;
|
||||
|
||||
@@ -8,9 +8,8 @@
|
||||
//! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill
|
||||
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
|
||||
|
||||
use super::RequestId;
|
||||
use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
|
||||
use crate::sync::manager::BatchProcessResult;
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
||||
use crate::sync::manager::{BatchProcessResult, Id};
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchState};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
@@ -357,7 +356,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
) -> Result<(), BackFillError> {
|
||||
if let Some(batch) = self.batches.get_mut(&batch_id) {
|
||||
// A batch could be retried without the peer failing the request (disconnecting/
|
||||
@@ -392,7 +391,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<ProcessResult, BackFillError> {
|
||||
// check if we have this batch
|
||||
@@ -535,7 +534,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
let process_id = ProcessId::BackSyncBatchId(batch_id);
|
||||
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
|
||||
self.current_processing_batch = Some(batch_id);
|
||||
|
||||
if let Err(e) = self
|
||||
|
||||
637
beacon_node/network/src/sync/block_lookups/mod.rs
Normal file
637
beacon_node/network/src/sync/block_lookups/mod.rs
Normal file
@@ -0,0 +1,637 @@
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::time::Duration;
|
||||
|
||||
use beacon_chain::{BeaconChainTypes, BlockError};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
use lru_cache::LRUCache;
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use smallvec::SmallVec;
|
||||
use store::{Hash256, SignedBeaconBlock};
|
||||
use strum::AsStaticRef;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
|
||||
use crate::metrics;
|
||||
|
||||
use self::{
|
||||
parent_lookup::{ParentLookup, VerifyError},
|
||||
single_block_lookup::SingleBlockRequest,
|
||||
};
|
||||
|
||||
use super::BatchProcessResult;
|
||||
use super::{
|
||||
manager::{BlockProcessType, Id},
|
||||
network_context::SyncNetworkContext,
|
||||
};
|
||||
|
||||
mod parent_lookup;
|
||||
mod single_block_lookup;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const FAILED_CHAINS_CACHE_SIZE: usize = 500;
|
||||
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
||||
|
||||
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
||||
/// A collection of parent block lookups.
|
||||
parent_queue: SmallVec<[ParentLookup<T::EthSpec>; 3]>,
|
||||
|
||||
/// A cache of failed chain lookups to prevent duplicate searches.
|
||||
failed_chains: LRUCache<Hash256>,
|
||||
|
||||
/// A collection of block hashes being searched for and a flag indicating if a result has been
|
||||
/// received or not.
|
||||
///
|
||||
/// The flag allows us to determine if the peer returned data or sent us nothing.
|
||||
single_block_lookups: FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
|
||||
|
||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
|
||||
|
||||
/// The logger for the import manager.
|
||||
log: Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
pub fn new(beacon_processor_send: mpsc::Sender<WorkEvent<T>>, log: Logger) -> Self {
|
||||
Self {
|
||||
parent_queue: Default::default(),
|
||||
failed_chains: LRUCache::new(FAILED_CHAINS_CACHE_SIZE),
|
||||
single_block_lookups: Default::default(),
|
||||
beacon_processor_send,
|
||||
log,
|
||||
}
|
||||
}
|
||||
|
||||
/* Lookup requests */
|
||||
|
||||
pub fn search_block(
|
||||
&mut self,
|
||||
hash: Hash256,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
// Do not re-request a block that is already being requested
|
||||
if self
|
||||
.single_block_lookups
|
||||
.values_mut()
|
||||
.any(|single_block_request| single_block_request.add_peer(&hash, &peer_id))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Searching for block";
|
||||
"peer_id" => %peer_id,
|
||||
"block" => %hash
|
||||
);
|
||||
|
||||
let mut single_block_request = SingleBlockRequest::new(hash, peer_id);
|
||||
|
||||
let (peer_id, request) = single_block_request.request_block().unwrap();
|
||||
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
|
||||
self.single_block_lookups
|
||||
.insert(request_id, single_block_request);
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
|
||||
self.single_block_lookups.len() as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn search_parent(
|
||||
&mut self,
|
||||
block: Box<SignedBeaconBlock<T::EthSpec>>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let block_root = block.canonical_root();
|
||||
let parent_root = block.parent_root();
|
||||
// If this block or it's parent is part of a known failed chain, ignore it.
|
||||
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
|
||||
debug!(self.log, "Block is from a past failed chain. Dropping";
|
||||
"block_root" => ?block_root, "block_slot" => block.slot());
|
||||
return;
|
||||
}
|
||||
|
||||
// Make sure this block is not already downloaded, and that neither it or its parent is
|
||||
// being searched for.
|
||||
if self.parent_queue.iter_mut().any(|parent_req| {
|
||||
parent_req.contains_block(&block)
|
||||
|| parent_req.add_peer(&block_root, &peer_id)
|
||||
|| parent_req.add_peer(&parent_root, &peer_id)
|
||||
}) {
|
||||
// we are already searching for this block, ignore it
|
||||
return;
|
||||
}
|
||||
|
||||
let parent_lookup = ParentLookup::new(*block, peer_id);
|
||||
self.request_parent(parent_lookup, cx);
|
||||
}
|
||||
|
||||
/* Lookup responses */
|
||||
|
||||
pub fn single_block_lookup_response(
|
||||
&mut self,
|
||||
id: Id,
|
||||
peer_id: PeerId,
|
||||
block: Option<Box<SignedBeaconBlock<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let mut request = match self.single_block_lookups.entry(id) {
|
||||
Entry::Occupied(req) => req,
|
||||
Entry::Vacant(_) => {
|
||||
if block.is_some() {
|
||||
crit!(
|
||||
self.log,
|
||||
"Block returned for single block lookup not present"
|
||||
);
|
||||
#[cfg(debug_assertions)]
|
||||
panic!("block returned for single block lookup not present");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match request.get_mut().verify_block(block) {
|
||||
Ok(Some(block)) => {
|
||||
// This is the correct block, send it for processing
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
// Remove to avoid inconsistencies
|
||||
self.single_block_lookups.remove(&id);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// request finished correctly, it will be removed after the block is processed.
|
||||
}
|
||||
Err(error) => {
|
||||
let msg: &str = error.as_static();
|
||||
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
|
||||
// Remove the request, if it can be retried it will be added with a new id.
|
||||
let mut req = request.remove();
|
||||
|
||||
debug!(self.log, "Single block lookup failed";
|
||||
"peer_id" => %peer_id, "error" => msg, "block_root" => %req.hash);
|
||||
// try the request again if possible
|
||||
if let Ok((peer_id, request)) = req.request_block() {
|
||||
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
|
||||
self.single_block_lookups.insert(id, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
|
||||
self.single_block_lookups.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn parent_lookup_response(
|
||||
&mut self,
|
||||
id: Id,
|
||||
peer_id: PeerId,
|
||||
block: Option<Box<SignedBeaconBlock<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let mut parent_lookup = if let Some(pos) = self
|
||||
.parent_queue
|
||||
.iter()
|
||||
.position(|request| request.pending_response(id))
|
||||
{
|
||||
self.parent_queue.remove(pos)
|
||||
} else {
|
||||
if block.is_some() {
|
||||
debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id);
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
match parent_lookup.verify_block(block, &self.failed_chains) {
|
||||
Ok(Some(block)) => {
|
||||
// Block is correct, send to the beacon processor.
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
self.parent_queue.push(parent_lookup)
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// Request finished successfully, nothing else to do. It will be removed after the
|
||||
// processing result arrives.
|
||||
self.parent_queue.push(parent_lookup);
|
||||
}
|
||||
Err(e) => match e {
|
||||
VerifyError::RootMismatch
|
||||
| VerifyError::NoBlockReturned
|
||||
| VerifyError::ExtraBlocksReturned => {
|
||||
let e = e.as_static();
|
||||
warn!(self.log, "Peer sent invalid response to parent request.";
|
||||
"peer_id" => %peer_id, "reason" => e);
|
||||
|
||||
// We do not tolerate these kinds of errors. We will accept a few but these are signs
|
||||
// of a faulty peer.
|
||||
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
|
||||
|
||||
// We try again if possible.
|
||||
self.request_parent(parent_lookup, cx);
|
||||
}
|
||||
VerifyError::PreviousFailure { parent_root } => {
|
||||
self.failed_chains.insert(parent_lookup.chain_hash());
|
||||
debug!(
|
||||
self.log,
|
||||
"Parent chain ignored due to past failure";
|
||||
"block" => %parent_root,
|
||||
);
|
||||
// Add the root block to failed chains
|
||||
self.failed_chains.insert(parent_lookup.chain_hash());
|
||||
|
||||
cx.report_peer(
|
||||
peer_id,
|
||||
PeerAction::MidToleranceError,
|
||||
"bbroot_failed_chains",
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||
self.parent_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
/* Error responses */
|
||||
|
||||
#[allow(clippy::needless_collect)] // false positive
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T::EthSpec>) {
|
||||
/* Check disconnection for single block lookups */
|
||||
// better written after https://github.com/rust-lang/rust/issues/59618
|
||||
let remove_retry_ids: Vec<Id> = self
|
||||
.single_block_lookups
|
||||
.iter_mut()
|
||||
.filter_map(|(id, req)| {
|
||||
if req.check_peer_disconnected(peer_id).is_err() {
|
||||
Some(*id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for mut req in remove_retry_ids
|
||||
.into_iter()
|
||||
.map(|id| self.single_block_lookups.remove(&id).unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
// retry the request
|
||||
match req.request_block() {
|
||||
Ok((peer_id, block_request)) => {
|
||||
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
|
||||
self.single_block_lookups.insert(request_id, req);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
trace!(self.log, "Single block request failed on peer disconnection";
|
||||
"block_root" => %req.hash, "peer_id" => %peer_id, "reason" => e.as_static());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Check disconnection for parent lookups */
|
||||
while let Some(pos) = self
|
||||
.parent_queue
|
||||
.iter_mut()
|
||||
.position(|req| req.check_peer_disconnected(peer_id).is_err())
|
||||
{
|
||||
let parent_lookup = self.parent_queue.remove(pos);
|
||||
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
|
||||
self.request_parent(parent_lookup, cx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parent_lookup_failed(
|
||||
&mut self,
|
||||
id: Id,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
if let Some(pos) = self
|
||||
.parent_queue
|
||||
.iter()
|
||||
.position(|request| request.pending_response(id))
|
||||
{
|
||||
let mut parent_lookup = self.parent_queue.remove(pos);
|
||||
parent_lookup.download_failed();
|
||||
trace!(self.log, "Parent lookup request failed"; &parent_lookup);
|
||||
self.request_parent(parent_lookup, cx);
|
||||
} else {
|
||||
return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id);
|
||||
};
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||
self.parent_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext<T::EthSpec>) {
|
||||
if let Some(mut request) = self.single_block_lookups.remove(&id) {
|
||||
request.register_failure();
|
||||
trace!(self.log, "Single block lookup failed"; "block" => %request.hash);
|
||||
if let Ok((peer_id, block_request)) = request.request_block() {
|
||||
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
|
||||
self.single_block_lookups.insert(request_id, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
|
||||
self.single_block_lookups.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
/* Processing responses */
|
||||
|
||||
pub fn single_block_processed(
|
||||
&mut self,
|
||||
id: Id,
|
||||
result: Result<(), BlockError<T::EthSpec>>,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let mut req = match self.single_block_lookups.remove(&id) {
|
||||
Some(req) => req,
|
||||
None => {
|
||||
#[cfg(debug_assertions)]
|
||||
panic!("block processed for single block lookup not present");
|
||||
#[cfg(not(debug_assertions))]
|
||||
return crit!(
|
||||
self.log,
|
||||
"Block processed for single block lookup not present"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let root = req.hash;
|
||||
let peer_id = match req.processing_peer() {
|
||||
Ok(peer) => peer,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
if let Err(e) = &result {
|
||||
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
|
||||
} else {
|
||||
trace!(self.log, "Single block processing succeeded"; "block" => %root);
|
||||
}
|
||||
|
||||
match result {
|
||||
Err(e) => match e {
|
||||
BlockError::BlockIsAlreadyKnown => {
|
||||
// No error here
|
||||
}
|
||||
BlockError::BeaconChainError(e) => {
|
||||
// Internal error
|
||||
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
|
||||
}
|
||||
BlockError::ParentUnknown(block) => {
|
||||
self.search_parent(block, peer_id, cx);
|
||||
}
|
||||
other => {
|
||||
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
|
||||
cx.report_peer(
|
||||
peer_id,
|
||||
PeerAction::MidToleranceError,
|
||||
"single_block_failure",
|
||||
);
|
||||
|
||||
// Try it again if possible.
|
||||
req.register_failure();
|
||||
if let Ok((peer_id, request)) = req.request_block() {
|
||||
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
|
||||
// insert with the new id
|
||||
self.single_block_lookups.insert(request_id, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(()) => {
|
||||
// No error here
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
|
||||
self.single_block_lookups.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn parent_block_processed(
|
||||
&mut self,
|
||||
chain_hash: Hash256,
|
||||
result: Result<(), BlockError<T::EthSpec>>,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self
|
||||
.parent_queue
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(pos, request)| {
|
||||
request
|
||||
.get_processing_peer(chain_hash)
|
||||
.map(|peer| (pos, peer))
|
||||
}) {
|
||||
(self.parent_queue.remove(pos), peer)
|
||||
} else {
|
||||
#[cfg(debug_assertions)]
|
||||
panic!(
|
||||
"Process response for a parent lookup request that was not found. Chain_hash: {}",
|
||||
chain_hash
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
return crit!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
|
||||
};
|
||||
|
||||
if let Err(e) = &result {
|
||||
trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e);
|
||||
} else {
|
||||
trace!(self.log, "Parent block processing succeeded"; &parent_lookup);
|
||||
}
|
||||
|
||||
match result {
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
// need to keep looking for parents
|
||||
// add the block back to the queue and continue the search
|
||||
parent_lookup.add_block(*block);
|
||||
self.request_parent(parent_lookup, cx);
|
||||
}
|
||||
Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => {
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
let blocks = parent_lookup.chain_blocks();
|
||||
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
|
||||
|
||||
match self
|
||||
.beacon_processor_send
|
||||
.try_send(WorkEvent::chain_segment(process_id, blocks))
|
||||
{
|
||||
Ok(_) => {
|
||||
self.parent_queue.push(parent_lookup);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to send chain segment to processor";
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(outcome) => {
|
||||
// all else we consider the chain a failure and downvote the peer that sent
|
||||
// us the last block
|
||||
warn!(
|
||||
self.log, "Invalid parent chain";
|
||||
"score_adjustment" => %PeerAction::MidToleranceError,
|
||||
"outcome" => ?outcome,
|
||||
"last_peer" => %peer_id,
|
||||
);
|
||||
|
||||
// Add this chain to cache of failed chains
|
||||
self.failed_chains.insert(chain_hash);
|
||||
|
||||
// This currently can be a host of errors. We permit this due to the partial
|
||||
// ambiguity.
|
||||
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||
self.parent_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn parent_chain_processed(
|
||||
&mut self,
|
||||
chain_hash: Hash256,
|
||||
result: BatchProcessResult,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
let parent_lookup = if let Some(pos) = self
|
||||
.parent_queue
|
||||
.iter()
|
||||
.position(|request| request.chain_hash() == chain_hash)
|
||||
{
|
||||
self.parent_queue.remove(pos)
|
||||
} else {
|
||||
#[cfg(debug_assertions)]
|
||||
panic!(
|
||||
"Chain process response for a parent lookup request that was not found. Chain_hash: {}",
|
||||
chain_hash
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
return crit!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
|
||||
};
|
||||
|
||||
debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result);
|
||||
match result {
|
||||
BatchProcessResult::Success(_) => {
|
||||
// nothing to do.
|
||||
}
|
||||
BatchProcessResult::Failed {
|
||||
imported_blocks: _,
|
||||
peer_action,
|
||||
} => {
|
||||
self.failed_chains.insert(parent_lookup.chain_hash());
|
||||
if let Some(peer_action) = peer_action {
|
||||
for &peer_id in parent_lookup.used_peers() {
|
||||
cx.report_peer(peer_id, peer_action, "parent_chain_failure")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||
self.parent_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
/* Helper functions */
|
||||
|
||||
fn send_block_for_processing(
|
||||
&mut self,
|
||||
block: Box<SignedBeaconBlock<T::EthSpec>>,
|
||||
duration: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Result<(), ()> {
|
||||
trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type);
|
||||
let event = WorkEvent::rpc_beacon_block(block, duration, process_type);
|
||||
if let Err(e) = self.beacon_processor_send.try_send(event) {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to send sync block to processor";
|
||||
"error" => ?e
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn request_parent(
|
||||
&mut self,
|
||||
mut parent_lookup: ParentLookup<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T::EthSpec>,
|
||||
) {
|
||||
match parent_lookup.request_parent(cx) {
|
||||
Err(e) => {
|
||||
debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static());
|
||||
match e {
|
||||
parent_lookup::RequestError::SendFailed(_) => {
|
||||
// Probably shutting down, nothing to do here. Drop the request
|
||||
}
|
||||
parent_lookup::RequestError::ChainTooLong
|
||||
| parent_lookup::RequestError::TooManyAttempts => {
|
||||
self.failed_chains.insert(parent_lookup.chain_hash());
|
||||
// This indicates faulty peers.
|
||||
for &peer_id in parent_lookup.used_peers() {
|
||||
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
|
||||
}
|
||||
}
|
||||
parent_lookup::RequestError::NoPeers => {
|
||||
// This happens if the peer disconnects while the block is being
|
||||
// processed. Drop the request without extra penalty
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
debug!(self.log, "Requesting parent"; &parent_lookup);
|
||||
self.parent_queue.push(parent_lookup)
|
||||
}
|
||||
}
|
||||
|
||||
// We remove and add back again requests so we want this updated regardless of outcome.
|
||||
metrics::set_gauge(
|
||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||
self.parent_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
201
beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Normal file
201
beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
use lighthouse_network::PeerId;
|
||||
use store::{EthSpec, Hash256, SignedBeaconBlock};
|
||||
use strum::AsStaticStr;
|
||||
|
||||
use crate::sync::{
|
||||
manager::{Id, SLOT_IMPORT_TOLERANCE},
|
||||
network_context::SyncNetworkContext,
|
||||
};
|
||||
|
||||
use super::single_block_lookup::{self, SingleBlockRequest};
|
||||
|
||||
/// How many attempts we try to find a parent of a block before we give up trying .
|
||||
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
|
||||
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
|
||||
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
|
||||
/// is further back than the most recent head slot.
|
||||
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
|
||||
|
||||
/// Maintains a sequential list of parents to lookup and the lookup's current state.
|
||||
pub(crate) struct ParentLookup<T: EthSpec> {
|
||||
/// The root of the block triggering this parent request.
|
||||
chain_hash: Hash256,
|
||||
/// The blocks that have currently been downloaded.
|
||||
downloaded_blocks: Vec<SignedBeaconBlock<T>>,
|
||||
/// Request of the last parent.
|
||||
current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
|
||||
/// Id of the last parent request.
|
||||
current_parent_request_id: Option<Id>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, AsStaticStr)]
|
||||
pub enum VerifyError {
|
||||
RootMismatch,
|
||||
NoBlockReturned,
|
||||
ExtraBlocksReturned,
|
||||
PreviousFailure { parent_root: Hash256 },
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum RequestError {
|
||||
SendFailed(&'static str),
|
||||
ChainTooLong,
|
||||
TooManyAttempts,
|
||||
NoPeers,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> ParentLookup<T> {
|
||||
pub fn contains_block(&self, block: &SignedBeaconBlock<T>) -> bool {
|
||||
self.downloaded_blocks
|
||||
.iter()
|
||||
.any(|d_block| d_block == block)
|
||||
}
|
||||
|
||||
pub fn new(block: SignedBeaconBlock<T>, peer_id: PeerId) -> Self {
|
||||
let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id);
|
||||
|
||||
Self {
|
||||
chain_hash: block.canonical_root(),
|
||||
downloaded_blocks: vec![block],
|
||||
current_parent_request,
|
||||
current_parent_request_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to request the next unknown parent. If the request fails, it should be removed.
|
||||
pub fn request_parent(&mut self, cx: &mut SyncNetworkContext<T>) -> Result<(), RequestError> {
|
||||
// check to make sure this request hasn't failed
|
||||
if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE {
|
||||
return Err(RequestError::ChainTooLong);
|
||||
}
|
||||
|
||||
let (peer_id, request) = self.current_parent_request.request_block()?;
|
||||
match cx.parent_lookup_request(peer_id, request) {
|
||||
Ok(request_id) => {
|
||||
self.current_parent_request_id = Some(request_id);
|
||||
Ok(())
|
||||
}
|
||||
Err(reason) => {
|
||||
self.current_parent_request_id = None;
|
||||
Err(RequestError::SendFailed(reason))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
|
||||
self.current_parent_request.check_peer_disconnected(peer_id)
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, block: SignedBeaconBlock<T>) {
|
||||
let next_parent = block.parent_root();
|
||||
self.downloaded_blocks.push(block);
|
||||
self.current_parent_request.hash = next_parent;
|
||||
self.current_parent_request.state = single_block_lookup::State::AwaitingDownload;
|
||||
self.current_parent_request_id = None;
|
||||
}
|
||||
|
||||
pub fn pending_response(&self, req_id: Id) -> bool {
|
||||
self.current_parent_request_id == Some(req_id)
|
||||
}
|
||||
|
||||
/// Get the parent lookup's chain hash.
|
||||
pub fn chain_hash(&self) -> Hash256 {
|
||||
self.chain_hash
|
||||
}
|
||||
|
||||
pub fn download_failed(&mut self) {
|
||||
self.current_parent_request.register_failure();
|
||||
self.current_parent_request_id = None;
|
||||
}
|
||||
|
||||
pub fn chain_blocks(&mut self) -> Vec<SignedBeaconBlock<T>> {
|
||||
std::mem::take(&mut self.downloaded_blocks)
|
||||
}
|
||||
|
||||
/// Verifies that the received block is what we requested. If so, parent lookup now waits for
|
||||
/// the processing result of the block.
|
||||
pub fn verify_block(
|
||||
&mut self,
|
||||
block: Option<Box<SignedBeaconBlock<T>>>,
|
||||
failed_chains: &lru_cache::LRUCache<Hash256>,
|
||||
) -> Result<Option<Box<SignedBeaconBlock<T>>>, VerifyError> {
|
||||
let block = self.current_parent_request.verify_block(block)?;
|
||||
|
||||
// check if the parent of this block isn't in the failed cache. If it is, this chain should
|
||||
// be dropped and the peer downscored.
|
||||
if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) {
|
||||
if failed_chains.contains(&parent_root) {
|
||||
self.current_parent_request.register_failure();
|
||||
self.current_parent_request_id = None;
|
||||
return Err(VerifyError::PreviousFailure { parent_root });
|
||||
}
|
||||
}
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub fn get_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {
|
||||
if self.chain_hash == chain_hash {
|
||||
return self.current_parent_request.processing_peer().ok();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn failed_attempts(&self) -> u8 {
|
||||
self.current_parent_request.failed_attempts
|
||||
}
|
||||
|
||||
pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool {
|
||||
self.current_parent_request.add_peer(block_root, peer_id)
|
||||
}
|
||||
|
||||
pub fn used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
|
||||
self.current_parent_request.used_peers.iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::single_block_lookup::VerifyError> for VerifyError {
|
||||
fn from(e: super::single_block_lookup::VerifyError) -> Self {
|
||||
use super::single_block_lookup::VerifyError as E;
|
||||
match e {
|
||||
E::RootMismatch => VerifyError::RootMismatch,
|
||||
E::NoBlockReturned => VerifyError::NoBlockReturned,
|
||||
E::ExtraBlocksReturned => VerifyError::ExtraBlocksReturned,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::single_block_lookup::LookupRequestError> for RequestError {
|
||||
fn from(e: super::single_block_lookup::LookupRequestError) -> Self {
|
||||
use super::single_block_lookup::LookupRequestError as E;
|
||||
match e {
|
||||
E::TooManyAttempts => RequestError::TooManyAttempts,
|
||||
E::NoPeers => RequestError::NoPeers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> slog::KV for ParentLookup<T> {
|
||||
fn serialize(
|
||||
&self,
|
||||
record: &slog::Record,
|
||||
serializer: &mut dyn slog::Serializer,
|
||||
) -> slog::Result {
|
||||
serializer.emit_arguments("chain_hash", &format_args!("{}", self.chain_hash))?;
|
||||
slog::Value::serialize(&self.current_parent_request, record, "parent", serializer)?;
|
||||
serializer.emit_usize("downloaded_blocks", self.downloaded_blocks.len())?;
|
||||
slog::Result::Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestError {
|
||||
pub fn as_static(&self) -> &'static str {
|
||||
match self {
|
||||
RequestError::SendFailed(e) => e,
|
||||
RequestError::ChainTooLong => "chain_too_long",
|
||||
RequestError::TooManyAttempts => "too_many_attempts",
|
||||
RequestError::NoPeers => "no_peers",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
|
||||
use rand::seq::IteratorRandom;
|
||||
use ssz_types::VariableList;
|
||||
use store::{EthSpec, Hash256, SignedBeaconBlock};
|
||||
use strum::AsStaticStr;
|
||||
|
||||
/// Object representing a single block lookup request.
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub struct SingleBlockRequest<const MAX_ATTEMPTS: u8> {
|
||||
/// The hash of the requested block.
|
||||
pub hash: Hash256,
|
||||
/// State of this request.
|
||||
pub state: State,
|
||||
/// Peers that should have this block.
|
||||
pub available_peers: HashSet<PeerId>,
|
||||
/// Peers from which we have requested this block.
|
||||
pub used_peers: HashSet<PeerId>,
|
||||
/// How many times have we attempted this block.
|
||||
pub failed_attempts: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum State {
|
||||
AwaitingDownload,
|
||||
Downloading { peer_id: PeerId },
|
||||
Processing { peer_id: PeerId },
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, AsStaticStr)]
|
||||
pub enum VerifyError {
|
||||
RootMismatch,
|
||||
NoBlockReturned,
|
||||
ExtraBlocksReturned,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, AsStaticStr)]
|
||||
pub enum LookupRequestError {
|
||||
TooManyAttempts,
|
||||
NoPeers,
|
||||
}
|
||||
|
||||
impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
|
||||
pub fn new(hash: Hash256, peer_id: PeerId) -> Self {
|
||||
Self {
|
||||
hash,
|
||||
state: State::AwaitingDownload,
|
||||
available_peers: HashSet::from([peer_id]),
|
||||
used_peers: HashSet::default(),
|
||||
failed_attempts: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_failure(&mut self) {
|
||||
self.failed_attempts += 1;
|
||||
self.state = State::AwaitingDownload;
|
||||
}
|
||||
|
||||
pub fn add_peer(&mut self, hash: &Hash256, peer_id: &PeerId) -> bool {
|
||||
let is_useful = &self.hash == hash;
|
||||
if is_useful {
|
||||
self.available_peers.insert(*peer_id);
|
||||
}
|
||||
is_useful
|
||||
}
|
||||
|
||||
/// If a peer disconnects, this request could be failed. If so, an error is returned
|
||||
pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> {
|
||||
self.available_peers.remove(dc_peer_id);
|
||||
if let State::Downloading { peer_id } = &self.state {
|
||||
if peer_id == dc_peer_id {
|
||||
// Peer disconnected before providing a block
|
||||
self.register_failure();
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verifies if the received block matches the requested one.
|
||||
/// Returns the block for processing if the response is what we expected.
|
||||
pub fn verify_block<T: EthSpec>(
|
||||
&mut self,
|
||||
block: Option<Box<SignedBeaconBlock<T>>>,
|
||||
) -> Result<Option<Box<SignedBeaconBlock<T>>>, VerifyError> {
|
||||
match self.state {
|
||||
State::AwaitingDownload => {
|
||||
self.register_failure();
|
||||
Err(VerifyError::ExtraBlocksReturned)
|
||||
}
|
||||
State::Downloading { peer_id } => match block {
|
||||
Some(block) => {
|
||||
if block.canonical_root() != self.hash {
|
||||
// return an error and drop the block
|
||||
self.register_failure();
|
||||
Err(VerifyError::RootMismatch)
|
||||
} else {
|
||||
// Return the block for processing.
|
||||
self.state = State::Processing { peer_id };
|
||||
Ok(Some(block))
|
||||
}
|
||||
}
|
||||
None => {
|
||||
self.register_failure();
|
||||
Err(VerifyError::NoBlockReturned)
|
||||
}
|
||||
},
|
||||
State::Processing { peer_id: _ } => match block {
|
||||
Some(_) => {
|
||||
// We sent the block for processing and received an extra block.
|
||||
self.register_failure();
|
||||
Err(VerifyError::ExtraBlocksReturned)
|
||||
}
|
||||
None => {
|
||||
// This is simply the stream termination and we are already processing the
|
||||
// block
|
||||
Ok(None)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> {
|
||||
debug_assert!(matches!(self.state, State::AwaitingDownload));
|
||||
if self.failed_attempts <= MAX_ATTEMPTS {
|
||||
if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) {
|
||||
let request = BlocksByRootRequest {
|
||||
block_roots: VariableList::from(vec![self.hash]),
|
||||
};
|
||||
self.state = State::Downloading { peer_id };
|
||||
self.used_peers.insert(peer_id);
|
||||
Ok((peer_id, request))
|
||||
} else {
|
||||
Err(LookupRequestError::NoPeers)
|
||||
}
|
||||
} else {
|
||||
Err(LookupRequestError::TooManyAttempts)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processing_peer(&self) -> Result<PeerId, ()> {
|
||||
if let State::Processing { peer_id } = &self.state {
|
||||
Ok(*peer_id)
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const MAX_ATTEMPTS: u8> slog::Value for SingleBlockRequest<MAX_ATTEMPTS> {
|
||||
fn serialize(
|
||||
&self,
|
||||
record: &slog::Record,
|
||||
key: slog::Key,
|
||||
serializer: &mut dyn slog::Serializer,
|
||||
) -> slog::Result {
|
||||
serializer.emit_str("request", key)?;
|
||||
serializer.emit_arguments("hash", &format_args!("{}", self.hash))?;
|
||||
match &self.state {
|
||||
State::AwaitingDownload => {
|
||||
"awaiting_download".serialize(record, "state", serializer)?
|
||||
}
|
||||
State::Downloading { peer_id } => {
|
||||
serializer.emit_arguments("downloading_peer", &format_args!("{}", peer_id))?
|
||||
}
|
||||
State::Processing { peer_id } => {
|
||||
serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))?
|
||||
}
|
||||
}
|
||||
slog::Result::Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
|
||||
use types::MinimalEthSpec as E;
|
||||
|
||||
fn rand_block() -> SignedBeaconBlock<E> {
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
SignedBeaconBlock::from_block(
|
||||
types::BeaconBlock::Base(types::BeaconBlockBase {
|
||||
..<_>::random_for_test(&mut rng)
|
||||
}),
|
||||
types::Signature::random_for_test(&mut rng),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_happy_path() {
|
||||
let peer_id = PeerId::random();
|
||||
let block = rand_block();
|
||||
|
||||
let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id);
|
||||
sl.request_block().unwrap();
|
||||
sl.verify_block(Some(Box::new(block))).unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_attempts() {
|
||||
let peer_id = PeerId::random();
|
||||
let block = rand_block();
|
||||
|
||||
let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id);
|
||||
sl.register_failure();
|
||||
}
|
||||
}
|
||||
460
beacon_node/network/src/sync/block_lookups/tests.rs
Normal file
460
beacon_node/network/src/sync/block_lookups/tests.rs
Normal file
@@ -0,0 +1,460 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::service::RequestId;
|
||||
use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::NetworkMessage;
|
||||
|
||||
use super::*;
|
||||
|
||||
use beacon_chain::builder::Witness;
|
||||
use beacon_chain::eth1_chain::CachingEth1Backend;
|
||||
use lighthouse_network::{NetworkGlobals, Request};
|
||||
use slog::{Drain, Level};
|
||||
use slot_clock::SystemTimeSlotClock;
|
||||
use store::MemoryStore;
|
||||
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
|
||||
use types::MinimalEthSpec as E;
|
||||
|
||||
type T = Witness<SystemTimeSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
|
||||
|
||||
struct TestRig {
|
||||
beacon_processor_rx: mpsc::Receiver<WorkEvent<T>>,
|
||||
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
||||
rng: XorShiftRng,
|
||||
}
|
||||
|
||||
const D: Duration = Duration::new(0, 0);
|
||||
|
||||
impl TestRig {
|
||||
fn test_setup(log_level: Option<Level>) -> (BlockLookups<T>, SyncNetworkContext<E>, Self) {
|
||||
let log = {
|
||||
let decorator = slog_term::TermDecorator::new().build();
|
||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||
let drain = slog_async::Async::new(drain).build().fuse();
|
||||
|
||||
if let Some(log_level) = log_level {
|
||||
slog::Logger::root(drain.filter_level(log_level).fuse(), slog::o!())
|
||||
} else {
|
||||
slog::Logger::root(drain.filter(|_| false).fuse(), slog::o!())
|
||||
}
|
||||
};
|
||||
|
||||
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(100);
|
||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||
let rng = XorShiftRng::from_seed([42; 16]);
|
||||
let rig = TestRig {
|
||||
beacon_processor_rx,
|
||||
network_rx,
|
||||
rng,
|
||||
};
|
||||
let bl = BlockLookups::new(
|
||||
beacon_processor_tx,
|
||||
log.new(slog::o!("component" => "block_lookups")),
|
||||
);
|
||||
let cx = {
|
||||
let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
|
||||
SyncNetworkContext::new(
|
||||
network_tx,
|
||||
globals,
|
||||
log.new(slog::o!("component" => "network_context")),
|
||||
)
|
||||
};
|
||||
|
||||
(bl, cx, rig)
|
||||
}
|
||||
|
||||
fn rand_block(&mut self) -> SignedBeaconBlock<E> {
|
||||
SignedBeaconBlock::from_block(
|
||||
types::BeaconBlock::Base(types::BeaconBlockBase {
|
||||
..<_>::random_for_test(&mut self.rng)
|
||||
}),
|
||||
types::Signature::random_for_test(&mut self.rng),
|
||||
)
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_block_request(&mut self) -> Id {
|
||||
match self.network_rx.try_recv() {
|
||||
Ok(NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: Request::BlocksByRoot(_request),
|
||||
request_id: RequestId::Sync(SyncId::SingleBlock { id }),
|
||||
}) => id,
|
||||
other => {
|
||||
panic!("Expected block request, found {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_parent_request(&mut self) -> Id {
|
||||
match self.network_rx.try_recv() {
|
||||
Ok(NetworkMessage::SendRequest {
|
||||
peer_id: _,
|
||||
request: Request::BlocksByRoot(_request),
|
||||
request_id: RequestId::Sync(SyncId::ParentLookup { id }),
|
||||
}) => id,
|
||||
other => panic!("Expected parent request, found {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_block_process(&mut self) {
|
||||
match self.beacon_processor_rx.try_recv() {
|
||||
Ok(work) => {
|
||||
assert_eq!(work.work_type(), crate::beacon_processor::RPC_BLOCK);
|
||||
}
|
||||
other => panic!("Expected block process, found {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_parent_chain_process(&mut self) {
|
||||
match self.beacon_processor_rx.try_recv() {
|
||||
Ok(work) => {
|
||||
assert_eq!(work.work_type(), crate::beacon_processor::CHAIN_SEGMENT);
|
||||
}
|
||||
other => panic!("Expected chain segment process, found {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_empty_network(&mut self) {
|
||||
assert_eq!(
|
||||
self.network_rx.try_recv().expect_err("must err"),
|
||||
mpsc::error::TryRecvError::Empty
|
||||
);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn expect_penalty(&mut self) {
|
||||
match self.network_rx.try_recv() {
|
||||
Ok(NetworkMessage::ReportPeer { .. }) => {}
|
||||
other => panic!("Expected peer penalty, found {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block_with_parent(&mut self, parent_root: Hash256) -> SignedBeaconBlock<E> {
|
||||
SignedBeaconBlock::from_block(
|
||||
types::BeaconBlock::Base(types::BeaconBlockBase {
|
||||
parent_root,
|
||||
..<_>::random_for_test(&mut self.rng)
|
||||
}),
|
||||
types::Signature::random_for_test(&mut self.rng),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_happy_path() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let block = rig.rand_block();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_block(block.canonical_root(), peer_id, &mut cx);
|
||||
let id = rig.expect_block_request();
|
||||
|
||||
// The peer provides the correct block, should not be penalized. Now the block should be sent
|
||||
// for processing.
|
||||
bl.single_block_lookup_response(id, peer_id, Some(Box::new(block)), D, &mut cx);
|
||||
rig.expect_empty_network();
|
||||
rig.expect_block_process();
|
||||
|
||||
// The request should still be active.
|
||||
assert_eq!(bl.single_block_lookups.len(), 1);
|
||||
|
||||
// Send the stream termination. Peer should have not been penalized, and the request removed
|
||||
// after processing.
|
||||
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
|
||||
bl.single_block_processed(id, Ok(()), &mut cx);
|
||||
rig.expect_empty_network();
|
||||
assert_eq!(bl.single_block_lookups.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_empty_response() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let block_hash = Hash256::random();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_block(block_hash, peer_id, &mut cx);
|
||||
let id = rig.expect_block_request();
|
||||
|
||||
// The peer does not have the block. It should be penalized.
|
||||
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
|
||||
rig.expect_penalty();
|
||||
|
||||
rig.expect_block_request(); // it should be retried
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_wrong_response() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let block_hash = Hash256::random();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_block(block_hash, peer_id, &mut cx);
|
||||
let id = rig.expect_block_request();
|
||||
|
||||
// Peer sends something else. It should be penalized.
|
||||
let bad_block = rig.rand_block();
|
||||
bl.single_block_lookup_response(id, peer_id, Some(Box::new(bad_block)), D, &mut cx);
|
||||
rig.expect_penalty();
|
||||
rig.expect_block_request(); // should be retried
|
||||
|
||||
// Send the stream termination. This should not produce an additional penalty.
|
||||
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
|
||||
rig.expect_empty_network();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_failure() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let block_hash = Hash256::random();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_block(block_hash, peer_id, &mut cx);
|
||||
let id = rig.expect_block_request();
|
||||
|
||||
// The request fails. RPC failures are handled elsewhere so we should not penalize the peer.
|
||||
bl.single_block_lookup_failed(id, &mut cx);
|
||||
rig.expect_block_request();
|
||||
rig.expect_empty_network();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_block_lookup_becomes_parent_request() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let block = rig.rand_block();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_block(block.canonical_root(), peer_id, &mut cx);
|
||||
let id = rig.expect_block_request();
|
||||
|
||||
// The peer provides the correct block, should not be penalized. Now the block should be sent
|
||||
// for processing.
|
||||
bl.single_block_lookup_response(id, peer_id, Some(Box::new(block.clone())), D, &mut cx);
|
||||
rig.expect_empty_network();
|
||||
rig.expect_block_process();
|
||||
|
||||
// The request should still be active.
|
||||
assert_eq!(bl.single_block_lookups.len(), 1);
|
||||
|
||||
// Send the stream termination. Peer should have not been penalized, and the request moved to a
|
||||
// parent request after processing.
|
||||
bl.single_block_processed(id, Err(BlockError::ParentUnknown(Box::new(block))), &mut cx);
|
||||
assert_eq!(bl.single_block_lookups.len(), 0);
|
||||
rig.expect_parent_request();
|
||||
rig.expect_empty_network();
|
||||
assert_eq!(bl.parent_queue.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_happy_path() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Box::new(block), peer_id, &mut cx);
|
||||
let id = rig.expect_parent_request();
|
||||
|
||||
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
|
||||
bl.parent_lookup_response(id, peer_id, Some(Box::new(parent)), D, &mut cx);
|
||||
rig.expect_block_process();
|
||||
rig.expect_empty_network();
|
||||
|
||||
// Processing succeeds, now the rest of the chain should be sent for processing.
|
||||
bl.parent_block_processed(chain_hash, Err(BlockError::BlockIsAlreadyKnown), &mut cx);
|
||||
rig.expect_parent_chain_process();
|
||||
bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx);
|
||||
assert_eq!(bl.parent_queue.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_wrong_response() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Box::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// Peer sends the wrong block, peer should be penalized and the block re-requested.
|
||||
let bad_block = rig.rand_block();
|
||||
bl.parent_lookup_response(id1, peer_id, Some(Box::new(bad_block)), D, &mut cx);
|
||||
rig.expect_penalty();
|
||||
let id2 = rig.expect_parent_request();
|
||||
|
||||
// Send the stream termination for the first request. This should not produce extra penalties.
|
||||
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx);
|
||||
rig.expect_empty_network();
|
||||
|
||||
// Send the right block this time.
|
||||
bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx);
|
||||
rig.expect_block_process();
|
||||
|
||||
// Processing succeeds, now the rest of the chain should be sent for processing.
|
||||
bl.parent_block_processed(chain_hash, Ok(()), &mut cx);
|
||||
rig.expect_parent_chain_process();
|
||||
bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx);
|
||||
assert_eq!(bl.parent_queue.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_empty_response() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Box::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// Peer sends an empty response, peer should be penalized and the block re-requested.
|
||||
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx);
|
||||
rig.expect_penalty();
|
||||
let id2 = rig.expect_parent_request();
|
||||
|
||||
// Send the right block this time.
|
||||
bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx);
|
||||
rig.expect_block_process();
|
||||
|
||||
// Processing succeeds, now the rest of the chain should be sent for processing.
|
||||
bl.parent_block_processed(chain_hash, Ok(()), &mut cx);
|
||||
rig.expect_parent_chain_process();
|
||||
bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx);
|
||||
assert_eq!(bl.parent_queue.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_rpc_failure() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Box::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// The request fails. It should be tried again.
|
||||
bl.parent_lookup_failed(id1, peer_id, &mut cx);
|
||||
let id2 = rig.expect_parent_request();
|
||||
|
||||
// Send the right block this time.
|
||||
bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx);
|
||||
rig.expect_block_process();
|
||||
|
||||
// Processing succeeds, now the rest of the chain should be sent for processing.
|
||||
bl.parent_block_processed(chain_hash, Ok(()), &mut cx);
|
||||
rig.expect_parent_chain_process();
|
||||
bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx);
|
||||
assert_eq!(bl.parent_queue.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_too_many_attempts() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Box::new(block), peer_id, &mut cx);
|
||||
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE + 1 {
|
||||
let id = rig.expect_parent_request();
|
||||
match i % 2 {
|
||||
// make sure every error is accounted for
|
||||
0 => {
|
||||
// The request fails. It should be tried again.
|
||||
bl.parent_lookup_failed(id, peer_id, &mut cx);
|
||||
}
|
||||
_ => {
|
||||
// Send a bad block this time. It should be tried again.
|
||||
let bad_block = rig.rand_block();
|
||||
bl.parent_lookup_response(id, peer_id, Some(Box::new(bad_block)), D, &mut cx);
|
||||
rig.expect_penalty();
|
||||
}
|
||||
}
|
||||
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
|
||||
assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i));
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(bl.parent_queue.len(), 0);
|
||||
assert!(bl.failed_chains.contains(&chain_hash));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_too_deep() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
let mut blocks =
|
||||
Vec::<SignedBeaconBlock<E>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE);
|
||||
while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE {
|
||||
let parent = blocks
|
||||
.last()
|
||||
.map(|b| b.canonical_root())
|
||||
.unwrap_or_else(Hash256::random);
|
||||
let block = rig.block_with_parent(parent);
|
||||
blocks.push(block);
|
||||
}
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
let trigger_block = blocks.pop().unwrap();
|
||||
let chain_hash = trigger_block.canonical_root();
|
||||
bl.search_parent(Box::new(trigger_block), peer_id, &mut cx);
|
||||
|
||||
for block in blocks.into_iter().rev() {
|
||||
let id = rig.expect_parent_request();
|
||||
// the block
|
||||
bl.parent_lookup_response(id, peer_id, Some(Box::new(block.clone())), D, &mut cx);
|
||||
// the stream termination
|
||||
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
|
||||
// the processing request
|
||||
rig.expect_block_process();
|
||||
// the processing result
|
||||
bl.parent_block_processed(
|
||||
chain_hash,
|
||||
Err(BlockError::ParentUnknown(Box::new(block))),
|
||||
&mut cx,
|
||||
)
|
||||
}
|
||||
|
||||
rig.expect_penalty();
|
||||
assert!(bl.failed_chains.contains(&chain_hash));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parent_lookup_disconnection() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
let peer_id = PeerId::random();
|
||||
let trigger_block = rig.rand_block();
|
||||
bl.search_parent(Box::new(trigger_block), peer_id, &mut cx);
|
||||
bl.peer_disconnected(&peer_id, &mut cx);
|
||||
assert!(bl.parent_queue.is_empty());
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -2,6 +2,7 @@
|
||||
//!
|
||||
//! Stores the various syncing methods for the beacon chain.
|
||||
mod backfill_sync;
|
||||
mod block_lookups;
|
||||
pub mod manager;
|
||||
mod network_context;
|
||||
mod peer_sync_info;
|
||||
@@ -9,6 +10,3 @@ mod range_sync;
|
||||
|
||||
pub use manager::{BatchProcessResult, SyncMessage};
|
||||
pub use range_sync::ChainId;
|
||||
|
||||
/// Type of id of rpc requests sent by sync
|
||||
pub type RequestId = usize;
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
|
||||
//! channel and stores a global RPC ID to perform requests.
|
||||
|
||||
use super::manager::SyncRequestType;
|
||||
use super::manager::{Id, RequestId as SyncRequestId};
|
||||
use super::range_sync::{BatchId, ChainId};
|
||||
use super::RequestId as SyncRequestId;
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::status::ToStatusMessage;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::rpc::{
|
||||
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId,
|
||||
};
|
||||
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
|
||||
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
|
||||
use slog::{debug, trace, warn};
|
||||
use std::sync::Arc;
|
||||
@@ -26,10 +23,12 @@ pub struct SyncNetworkContext<T: EthSpec> {
|
||||
network_globals: Arc<NetworkGlobals<T>>,
|
||||
|
||||
/// A sequential ID for all RPC requests.
|
||||
request_id: SyncRequestId,
|
||||
request_id: Id,
|
||||
|
||||
/// BlocksByRange requests made by syncing algorithms.
|
||||
range_requests: FnvHashMap<SyncRequestId, SyncRequestType>,
|
||||
/// BlocksByRange requests made by the range syncing algorithm.
|
||||
range_requests: FnvHashMap<Id, (ChainId, BatchId)>,
|
||||
|
||||
backfill_requests: FnvHashMap<Id, BatchId>,
|
||||
|
||||
/// Logger for the `SyncNetworkContext`.
|
||||
log: slog::Logger,
|
||||
@@ -46,6 +45,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
network_globals,
|
||||
request_id: 1,
|
||||
range_requests: FnvHashMap::default(),
|
||||
backfill_requests: FnvHashMap::default(),
|
||||
log,
|
||||
}
|
||||
}
|
||||
@@ -78,7 +78,13 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
"head_slot" => %status_message.head_slot,
|
||||
);
|
||||
|
||||
let _ = self.send_rpc_request(peer_id, Request::Status(status_message.clone()));
|
||||
let request = Request::Status(status_message.clone());
|
||||
let request_id = RequestId::Router;
|
||||
let _ = self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,7 +96,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
request: BlocksByRangeRequest,
|
||||
chain_id: ChainId,
|
||||
batch_id: BatchId,
|
||||
) -> Result<SyncRequestId, &'static str> {
|
||||
) -> Result<Id, &'static str> {
|
||||
trace!(
|
||||
self.log,
|
||||
"Sending BlocksByRange Request";
|
||||
@@ -98,10 +104,16 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
"count" => request.count,
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?;
|
||||
self.range_requests
|
||||
.insert(req_id, SyncRequestType::RangeSync(batch_id, chain_id));
|
||||
Ok(req_id)
|
||||
let request = Request::BlocksByRange(request);
|
||||
let id = self.next_id();
|
||||
let request_id = RequestId::Sync(SyncRequestId::RangeSync { id });
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
})?;
|
||||
self.range_requests.insert(id, (chain_id, batch_id));
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// A blocks by range request sent by the backfill sync algorithm
|
||||
@@ -110,7 +122,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
peer_id: PeerId,
|
||||
request: BlocksByRangeRequest,
|
||||
batch_id: BatchId,
|
||||
) -> Result<SyncRequestId, &'static str> {
|
||||
) -> Result<Id, &'static str> {
|
||||
trace!(
|
||||
self.log,
|
||||
"Sending backfill BlocksByRange Request";
|
||||
@@ -118,21 +130,24 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
"count" => request.count,
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?;
|
||||
self.range_requests
|
||||
.insert(req_id, SyncRequestType::BackFillSync(batch_id));
|
||||
Ok(req_id)
|
||||
let request = Request::BlocksByRange(request);
|
||||
let id = self.next_id();
|
||||
let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id });
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
})?;
|
||||
self.backfill_requests.insert(id, batch_id);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Received a blocks by range response.
|
||||
pub fn blocks_by_range_response(
|
||||
pub fn range_sync_response(
|
||||
&mut self,
|
||||
request_id: usize,
|
||||
request_id: Id,
|
||||
remove: bool,
|
||||
) -> Option<SyncRequestType> {
|
||||
// NOTE: we can't guarantee that the request must be registered as it could receive more
|
||||
// than an error, and be removed after receiving the first one.
|
||||
// FIXME: https://github.com/sigp/lighthouse/issues/1634
|
||||
) -> Option<(ChainId, BatchId)> {
|
||||
if remove {
|
||||
self.range_requests.remove(&request_id)
|
||||
} else {
|
||||
@@ -140,12 +155,21 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a blocks by root request.
|
||||
pub fn blocks_by_root_request(
|
||||
/// Received a blocks by range response.
|
||||
pub fn backfill_sync_response(&mut self, request_id: Id, remove: bool) -> Option<BatchId> {
|
||||
if remove {
|
||||
self.backfill_requests.remove(&request_id)
|
||||
} else {
|
||||
self.backfill_requests.get(&request_id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a blocks by root request for a single block lookup.
|
||||
pub fn single_block_lookup_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: BlocksByRootRequest,
|
||||
) -> Result<usize, &'static str> {
|
||||
) -> Result<Id, &'static str> {
|
||||
trace!(
|
||||
self.log,
|
||||
"Sending BlocksByRoot Request";
|
||||
@@ -153,7 +177,39 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
"count" => request.block_roots.len(),
|
||||
"peer" => %peer_id
|
||||
);
|
||||
self.send_rpc_request(peer_id, Request::BlocksByRoot(request))
|
||||
let request = Request::BlocksByRoot(request);
|
||||
let id = self.next_id();
|
||||
let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id });
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
})?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Sends a blocks by root request for a parent request.
|
||||
pub fn parent_lookup_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: BlocksByRootRequest,
|
||||
) -> Result<Id, &'static str> {
|
||||
trace!(
|
||||
self.log,
|
||||
"Sending BlocksByRoot Request";
|
||||
"method" => "BlocksByRoot",
|
||||
"count" => request.block_roots.len(),
|
||||
"peer" => %peer_id
|
||||
);
|
||||
let request = Request::BlocksByRoot(request);
|
||||
let id = self.next_id();
|
||||
let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request,
|
||||
request_id,
|
||||
})?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Terminates the connection with the peer and bans them.
|
||||
@@ -184,22 +240,6 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
});
|
||||
}
|
||||
|
||||
/// Sends an RPC request.
|
||||
fn send_rpc_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: Request,
|
||||
) -> Result<usize, &'static str> {
|
||||
let request_id = self.request_id;
|
||||
self.request_id += 1;
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: RequestId::Sync(request_id),
|
||||
request,
|
||||
})?;
|
||||
Ok(request_id)
|
||||
}
|
||||
|
||||
/// Subscribes to core topics.
|
||||
pub fn subscribe_core_topics(&mut self) {
|
||||
self.network_send
|
||||
@@ -216,4 +256,10 @@ impl<T: EthSpec> SyncNetworkContext<T> {
|
||||
"Network channel send Failed"
|
||||
})
|
||||
}
|
||||
|
||||
fn next_id(&mut self) -> Id {
|
||||
let id = self.request_id;
|
||||
self.request_id += 1;
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::sync::RequestId;
|
||||
use crate::sync::manager::Id;
|
||||
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
|
||||
use lighthouse_network::PeerId;
|
||||
use std::collections::HashSet;
|
||||
@@ -93,7 +93,7 @@ pub enum BatchState<T: EthSpec> {
|
||||
/// The batch has failed either downloading or processing, but can be requested again.
|
||||
AwaitingDownload,
|
||||
/// The batch is being downloaded.
|
||||
Downloading(PeerId, Vec<SignedBeaconBlock<T>>, RequestId),
|
||||
Downloading(PeerId, Vec<SignedBeaconBlock<T>>, Id),
|
||||
/// The batch has been completely downloaded and is ready for processing.
|
||||
AwaitingProcessing(PeerId, Vec<SignedBeaconBlock<T>>),
|
||||
/// The batch is being processed.
|
||||
@@ -167,7 +167,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
|
||||
}
|
||||
|
||||
/// Verifies if an incomming block belongs to this batch.
|
||||
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &RequestId) -> bool {
|
||||
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool {
|
||||
if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state {
|
||||
return peer_id == expected_peer && expected_id == request_id;
|
||||
}
|
||||
@@ -312,7 +312,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
|
||||
pub fn start_downloading_from_peer(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
) -> Result<(), WrongState> {
|
||||
match self.state.poison() {
|
||||
BatchState::AwaitingDownload => {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::batch::{BatchInfo, BatchState};
|
||||
use crate::beacon_processor::ProcessId;
|
||||
use crate::beacon_processor::ChainSegmentProcessId;
|
||||
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId};
|
||||
use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
@@ -214,7 +214,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> ProcessingResult {
|
||||
// check if we have this batch
|
||||
@@ -300,7 +300,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
// for removing chains and checking completion is in the callback.
|
||||
|
||||
let blocks = batch.start_processing()?;
|
||||
let process_id = ProcessId::RangeBatchId(self.id, batch_id);
|
||||
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
|
||||
self.current_processing_batch = Some(batch_id);
|
||||
|
||||
if let Err(e) = self
|
||||
@@ -807,7 +807,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
) -> ProcessingResult {
|
||||
if let Some(batch) = self.batches.get_mut(&batch_id) {
|
||||
// A batch could be retried without the peer failing the request (disconnecting/
|
||||
|
||||
@@ -45,8 +45,9 @@ use super::chain_collection::ChainCollection;
|
||||
use super::sync_type::RangeSyncType;
|
||||
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::manager::Id;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::{BatchProcessResult, RequestId};
|
||||
use crate::sync::BatchProcessResult;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::PeerId;
|
||||
use lighthouse_network::SyncInfo;
|
||||
@@ -201,7 +202,7 @@ where
|
||||
peer_id: PeerId,
|
||||
chain_id: ChainId,
|
||||
batch_id: BatchId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
// check if this chunk removes the chain
|
||||
@@ -300,7 +301,7 @@ where
|
||||
peer_id: PeerId,
|
||||
batch_id: BatchId,
|
||||
chain_id: ChainId,
|
||||
request_id: RequestId,
|
||||
request_id: Id,
|
||||
) {
|
||||
// check that this request is pending
|
||||
match self.chains.call_by_id(chain_id, |chain| {
|
||||
@@ -364,6 +365,7 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::service::RequestId;
|
||||
use crate::NetworkMessage;
|
||||
|
||||
use super::*;
|
||||
@@ -494,10 +496,7 @@ mod tests {
|
||||
}
|
||||
|
||||
/// Reads an BlocksByRange request to a given peer from the network receiver channel.
|
||||
fn grab_request(
|
||||
&mut self,
|
||||
expected_peer: &PeerId,
|
||||
) -> (lighthouse_network::rpc::RequestId, BlocksByRangeRequest) {
|
||||
fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) {
|
||||
if let Some(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: Request::BlocksByRange(request),
|
||||
|
||||
Reference in New Issue
Block a user