mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 13:54:36 +00:00
Transition block lookup sync to range sync (#6122)
* Transition block lookup sync to range sync * Log unexpected state * Merge remote-tracking branch 'sigp/unstable' into lookup-to-range * Add docs * Merge remote-tracking branch 'sigp/unstable' into lookup-to-range
This commit is contained in:
@@ -2790,12 +2790,12 @@ pub fn build_log(level: slog::Level, logger_type: LoggerType) -> Logger {
|
|||||||
match logger_type {
|
match logger_type {
|
||||||
LoggerType::Test => {
|
LoggerType::Test => {
|
||||||
let drain = FullFormat::new(TermDecorator::new().build()).build().fuse();
|
let drain = FullFormat::new(TermDecorator::new().build()).build().fuse();
|
||||||
let drain = Async::new(drain).build().fuse();
|
let drain = Async::new(drain).chan_size(10_000).build().fuse();
|
||||||
Logger::root(drain.filter_level(level).fuse(), o!())
|
Logger::root(drain.filter_level(level).fuse(), o!())
|
||||||
}
|
}
|
||||||
LoggerType::CI => {
|
LoggerType::CI => {
|
||||||
let drain = FullFormat::new(ci_decorator()).build().fuse();
|
let drain = FullFormat::new(ci_decorator()).build().fuse();
|
||||||
let drain = Async::new(drain).build().fuse();
|
let drain = Async::new(drain).chan_size(10_000).build().fuse();
|
||||||
Logger::root(drain.filter_level(level).fuse(), o!())
|
Logger::root(drain.filter_level(level).fuse(), o!())
|
||||||
}
|
}
|
||||||
LoggerType::Null => {
|
LoggerType::Null => {
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::MemoryStore;
|
use store::MemoryStore;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio::sync::mpsc::{self, error::TrySendError};
|
use tokio::sync::mpsc::{self, error::TrySendError};
|
||||||
use types::*;
|
use types::*;
|
||||||
|
|
||||||
@@ -831,7 +832,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
/// Send a message to `sync_tx`.
|
/// Send a message to `sync_tx`.
|
||||||
///
|
///
|
||||||
/// Creates a log if there is an internal error.
|
/// Creates a log if there is an internal error.
|
||||||
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
|
pub(crate) fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
|
||||||
self.sync_tx.send(message).unwrap_or_else(|e| {
|
self.sync_tx.send(message).unwrap_or_else(|e| {
|
||||||
debug!(self.log, "Could not send message to the sync service";
|
debug!(self.log, "Could not send message to the sync service";
|
||||||
"error" => %e)
|
"error" => %e)
|
||||||
@@ -859,6 +860,7 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
|
|||||||
// processor (but not much else).
|
// processor (but not much else).
|
||||||
pub fn null_for_testing(
|
pub fn null_for_testing(
|
||||||
network_globals: Arc<NetworkGlobals<E>>,
|
network_globals: Arc<NetworkGlobals<E>>,
|
||||||
|
sync_tx: UnboundedSender<SyncMessage<E>>,
|
||||||
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
|
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
|
||||||
executor: TaskExecutor,
|
executor: TaskExecutor,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
@@ -871,7 +873,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
|
|||||||
} = <_>::default();
|
} = <_>::default();
|
||||||
|
|
||||||
let (network_tx, _network_rx) = mpsc::unbounded_channel();
|
let (network_tx, _network_rx) = mpsc::unbounded_channel();
|
||||||
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let network_beacon_processor = Self {
|
let network_beacon_processor = Self {
|
||||||
beacon_processor_send: beacon_processor_tx,
|
beacon_processor_send: beacon_processor_tx,
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
|
|||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::sync::block_lookups::common::ResponseType;
|
use crate::sync::block_lookups::common::ResponseType;
|
||||||
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
|
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
|
||||||
|
use crate::sync::SyncMessage;
|
||||||
use beacon_chain::block_verification_types::AsBlock;
|
use beacon_chain::block_verification_types::AsBlock;
|
||||||
use beacon_chain::data_availability_checker::{
|
use beacon_chain::data_availability_checker::{
|
||||||
AvailabilityCheckError, AvailabilityCheckErrorCategory,
|
AvailabilityCheckError, AvailabilityCheckErrorCategory,
|
||||||
@@ -55,7 +56,10 @@ mod tests;
|
|||||||
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
|
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
|
||||||
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
|
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
|
||||||
/// is further back than the most recent head slot.
|
/// is further back than the most recent head slot.
|
||||||
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
|
///
|
||||||
|
/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup
|
||||||
|
/// reaches the maximum depth it will force trigger range sync.
|
||||||
|
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE;
|
||||||
|
|
||||||
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
|
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
|
||||||
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
|
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
|
||||||
@@ -254,22 +258,59 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
// blocks on top of A forming A -> C. The malicious peer forces us to fetch C
|
// blocks on top of A forming A -> C. The malicious peer forces us to fetch C
|
||||||
// from it, which will result in parent A hitting the chain_too_long error. Then
|
// from it, which will result in parent A hitting the chain_too_long error. Then
|
||||||
// the valid chain A -> B is dropped too.
|
// the valid chain A -> B is dropped too.
|
||||||
if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) {
|
//
|
||||||
// Drop all lookups descending from the child of the too long parent chain
|
// `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for
|
||||||
if let Some((lookup_id, lookup)) = self
|
// complete-ness
|
||||||
|
let parent_chain_tip = parent_chain.tip;
|
||||||
|
let block_to_drop =
|
||||||
|
find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip);
|
||||||
|
// Drop all lookups descending from the child of the too long parent chain
|
||||||
|
if let Some((lookup_id, lookup)) = self
|
||||||
|
.single_block_lookups
|
||||||
|
.iter()
|
||||||
|
.find(|(_, l)| l.block_root() == block_to_drop)
|
||||||
|
{
|
||||||
|
// If a lookup chain is too long, we can't distinguish a valid chain from a
|
||||||
|
// malicious one. We must attempt to sync this chain to not lose liveness. If
|
||||||
|
// the chain grows too long, we stop lookup sync and transition this head to
|
||||||
|
// forward range sync. We need to tell range sync which head to sync to, and
|
||||||
|
// from which peers. The lookup of the very tip of this chain may contain zero
|
||||||
|
// peers if it's the parent-child lookup. So we do a bit of a trick here:
|
||||||
|
// - Tell range sync to sync to the tip's root (if available, else its ancestor)
|
||||||
|
// - But use all peers in the ancestor lookup, which should have at least one
|
||||||
|
// peer, and its peer set is a strict superset of the tip's lookup.
|
||||||
|
if let Some((_, tip_lookup)) = self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.find(|(_, l)| l.block_root() == block_to_drop)
|
.find(|(_, l)| l.block_root() == parent_chain_tip)
|
||||||
{
|
{
|
||||||
for &peer_id in lookup.all_peers() {
|
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
|
||||||
cx.report_peer(
|
peers: lookup.all_peers().copied().collect(),
|
||||||
peer_id,
|
head_slot: tip_lookup.peek_downloaded_block_slot(),
|
||||||
PeerAction::LowToleranceError,
|
head_root: parent_chain_tip,
|
||||||
"chain_too_long",
|
});
|
||||||
);
|
} else {
|
||||||
}
|
// Should never happen, log error and continue the lookup drop
|
||||||
self.drop_lookup_and_children(*lookup_id);
|
error!(self.log, "Unable to transition lookup to range sync";
|
||||||
|
"error" => "Parent chain tip lookup not found",
|
||||||
|
"block_root" => ?parent_chain_tip
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do not downscore peers here. Because we can't distinguish a valid chain from
|
||||||
|
// a malicious one we may penalize honest peers for attempting to discover us a
|
||||||
|
// valid chain. Until blocks_by_range allows to specify a tip, for example with
|
||||||
|
// https://github.com/ethereum/consensus-specs/pull/3845 we will have poor
|
||||||
|
// attributability. A peer can send us garbage blocks over blocks_by_root, and
|
||||||
|
// then correct blocks via blocks_by_range.
|
||||||
|
|
||||||
|
self.drop_lookup_and_children(*lookup_id);
|
||||||
|
} else {
|
||||||
|
// Should never happen
|
||||||
|
error!(self.log, "Unable to transition lookup to range sync";
|
||||||
|
"error" => "Block to drop lookup not found",
|
||||||
|
"block_root" => ?block_to_drop
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
|
|||||||
use store::Hash256;
|
use store::Hash256;
|
||||||
use strum::IntoStaticStr;
|
use strum::IntoStaticStr;
|
||||||
use types::blob_sidecar::FixedBlobSidecarList;
|
use types::blob_sidecar::FixedBlobSidecarList;
|
||||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
|
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
// Dedicated enum for LookupResult to force its usage
|
// Dedicated enum for LookupResult to force its usage
|
||||||
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
||||||
@@ -91,6 +91,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
|
||||||
|
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
|
||||||
|
self.block_request_state
|
||||||
|
.state
|
||||||
|
.peek_downloaded_data()
|
||||||
|
.map(|block| block.slot())
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the block root that is being requested.
|
/// Get the block root that is being requested.
|
||||||
pub fn block_root(&self) -> Hash256 {
|
pub fn block_root(&self) -> Hash256 {
|
||||||
self.block_root
|
self.block_root
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||||
use crate::sync::manager::{BlockProcessType, SyncManager};
|
use crate::sync::manager::{BlockProcessType, SyncManager};
|
||||||
use crate::sync::peer_sampling::SamplingConfig;
|
use crate::sync::peer_sampling::SamplingConfig;
|
||||||
|
use crate::sync::range_sync::RangeSyncType;
|
||||||
use crate::sync::{SamplingId, SyncMessage};
|
use crate::sync::{SamplingId, SyncMessage};
|
||||||
use crate::NetworkMessage;
|
use crate::NetworkMessage;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -78,6 +79,8 @@ struct TestRig {
|
|||||||
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
||||||
/// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests)
|
/// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests)
|
||||||
network_rx_queue: Vec<NetworkMessage<E>>,
|
network_rx_queue: Vec<NetworkMessage<E>>,
|
||||||
|
/// Receiver for `SyncMessage` from the network
|
||||||
|
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
|
||||||
/// To send `SyncMessage`. For sending RPC responses or block processing results to sync.
|
/// To send `SyncMessage`. For sending RPC responses or block processing results to sync.
|
||||||
sync_manager: SyncManager<T>,
|
sync_manager: SyncManager<T>,
|
||||||
/// To manipulate sync state and peer connection status
|
/// To manipulate sync state and peer connection status
|
||||||
@@ -137,6 +140,7 @@ impl TestRig {
|
|||||||
let chain = harness.chain.clone();
|
let chain = harness.chain.clone();
|
||||||
|
|
||||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||||
|
let (sync_tx, sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
|
||||||
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
|
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
|
||||||
// column assignments
|
// column assignments
|
||||||
let network_config = Arc::new(NetworkConfig::default());
|
let network_config = Arc::new(NetworkConfig::default());
|
||||||
@@ -148,13 +152,12 @@ impl TestRig {
|
|||||||
));
|
));
|
||||||
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
|
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
|
||||||
globals,
|
globals,
|
||||||
|
sync_tx,
|
||||||
chain.clone(),
|
chain.clone(),
|
||||||
harness.runtime.task_executor.clone(),
|
harness.runtime.task_executor.clone(),
|
||||||
log.clone(),
|
log.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<E>>();
|
|
||||||
|
|
||||||
let fork_name = chain.spec.fork_name_at_slot::<E>(chain.slot().unwrap());
|
let fork_name = chain.spec.fork_name_at_slot::<E>(chain.slot().unwrap());
|
||||||
|
|
||||||
// All current tests expect synced and EL online state
|
// All current tests expect synced and EL online state
|
||||||
@@ -168,13 +171,15 @@ impl TestRig {
|
|||||||
beacon_processor_rx_queue: vec![],
|
beacon_processor_rx_queue: vec![],
|
||||||
network_rx,
|
network_rx,
|
||||||
network_rx_queue: vec![],
|
network_rx_queue: vec![],
|
||||||
|
sync_rx,
|
||||||
rng,
|
rng,
|
||||||
network_globals: beacon_processor.network_globals.clone(),
|
network_globals: beacon_processor.network_globals.clone(),
|
||||||
sync_manager: SyncManager::new(
|
sync_manager: SyncManager::new(
|
||||||
chain,
|
chain,
|
||||||
network_tx,
|
network_tx,
|
||||||
beacon_processor.into(),
|
beacon_processor.into(),
|
||||||
sync_recv,
|
// Pass empty recv not tied to any tx
|
||||||
|
mpsc::unbounded_channel().1,
|
||||||
SamplingConfig::Custom {
|
SamplingConfig::Custom {
|
||||||
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
|
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
|
||||||
},
|
},
|
||||||
@@ -237,6 +242,13 @@ impl TestRig {
|
|||||||
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
|
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drain all sync messages in the sync_rx attached to the beacon processor
|
||||||
|
fn drain_sync_rx(&mut self) {
|
||||||
|
while let Ok(sync_message) = self.sync_rx.try_recv() {
|
||||||
|
self.send_sync_message(sync_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn rand_block(&mut self) -> SignedBeaconBlock<E> {
|
fn rand_block(&mut self) -> SignedBeaconBlock<E> {
|
||||||
self.rand_block_and_blobs(NumBlobs::None).0
|
self.rand_block_and_blobs(NumBlobs::None).0
|
||||||
}
|
}
|
||||||
@@ -293,6 +305,10 @@ impl TestRig {
|
|||||||
self.sync_manager.active_parent_lookups().len()
|
self.sync_manager.active_parent_lookups().len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) {
|
||||||
|
self.sync_manager.get_range_sync_chains().unwrap().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
fn assert_single_lookups_count(&self, count: usize) {
|
fn assert_single_lookups_count(&self, count: usize) {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
self.active_single_lookups_count(),
|
self.active_single_lookups_count(),
|
||||||
@@ -1696,7 +1712,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
rig.expect_penalty(peer_id, "chain_too_long");
|
// Should create a new syncing chain
|
||||||
|
rig.drain_sync_rx();
|
||||||
|
assert_eq!(
|
||||||
|
rig.active_range_sync_chain(),
|
||||||
|
(
|
||||||
|
RangeSyncType::Head,
|
||||||
|
Slot::new(0),
|
||||||
|
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
|
||||||
|
rig.expect_no_penalty_for(peer_id);
|
||||||
rig.assert_failed_chain(chain_hash);
|
rig.assert_failed_chain(chain_hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1723,7 +1750,18 @@ fn test_parent_lookup_too_deep_grow_tip() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
rig.expect_penalty(peer_id, "chain_too_long");
|
// Should create a new syncing chain
|
||||||
|
rig.drain_sync_rx();
|
||||||
|
assert_eq!(
|
||||||
|
rig.active_range_sync_chain(),
|
||||||
|
(
|
||||||
|
RangeSyncType::Head,
|
||||||
|
Slot::new(0),
|
||||||
|
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
|
||||||
|
rig.expect_no_penalty_for(peer_id);
|
||||||
rig.assert_failed_chain(tip.canonical_root());
|
rig.assert_failed_chain(tip.canonical_root());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -94,6 +94,15 @@ pub enum SyncMessage<E: EthSpec> {
|
|||||||
/// A useful peer has been discovered.
|
/// A useful peer has been discovered.
|
||||||
AddPeer(PeerId, SyncInfo),
|
AddPeer(PeerId, SyncInfo),
|
||||||
|
|
||||||
|
/// Force trigger range sync for a set of peers given a head they claim to have imported. Used
|
||||||
|
/// by block lookup to trigger range sync if a parent chain grows too large.
|
||||||
|
AddPeersForceRangeSync {
|
||||||
|
peers: Vec<PeerId>,
|
||||||
|
head_root: Hash256,
|
||||||
|
/// Sync lookup may not know the Slot of this head. However this situation is very rare.
|
||||||
|
head_slot: Option<Slot>,
|
||||||
|
},
|
||||||
|
|
||||||
/// A block has been received from the RPC.
|
/// A block has been received from the RPC.
|
||||||
RpcBlock {
|
RpcBlock {
|
||||||
request_id: SyncRequestId,
|
request_id: SyncRequestId,
|
||||||
@@ -322,6 +331,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn get_range_sync_chains(
|
||||||
|
&self,
|
||||||
|
) -> Result<Option<(RangeSyncType, Slot, Slot)>, &'static str> {
|
||||||
|
self.range_sync.state()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn get_failed_chains(&mut self) -> Vec<Hash256> {
|
pub(crate) fn get_failed_chains(&mut self) -> Vec<Hash256> {
|
||||||
self.block_lookups.get_failed_chains()
|
self.block_lookups.get_failed_chains()
|
||||||
@@ -376,11 +392,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
let sync_type = remote_sync_type(&local, &remote, &self.chain);
|
let sync_type = remote_sync_type(&local, &remote, &self.chain);
|
||||||
|
|
||||||
// update the state of the peer.
|
// update the state of the peer.
|
||||||
let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type);
|
let is_still_connected = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type);
|
||||||
|
if is_still_connected {
|
||||||
if matches!(sync_type, PeerSyncType::Advanced) && should_add {
|
match sync_type {
|
||||||
self.range_sync
|
PeerSyncType::Behind => {} // Do nothing
|
||||||
.add_peer(&mut self.network, local, peer_id, remote);
|
PeerSyncType::Advanced => {
|
||||||
|
self.range_sync
|
||||||
|
.add_peer(&mut self.network, local, peer_id, remote);
|
||||||
|
}
|
||||||
|
PeerSyncType::FullySynced => {
|
||||||
|
// Sync considers this peer close enough to the head to not trigger range sync.
|
||||||
|
// Range sync handles well syncing large ranges of blocks, of a least a few blocks.
|
||||||
|
// However this peer may be in a fork that we should sync but we have not discovered
|
||||||
|
// yet. If the head of the peer is unknown, attempt block lookup first. If the
|
||||||
|
// unknown head turns out to be on a longer fork, it will trigger range sync.
|
||||||
|
//
|
||||||
|
// A peer should always be considered `Advanced` if its finalized root is
|
||||||
|
// unknown and ahead of ours, so we don't check for that root here.
|
||||||
|
//
|
||||||
|
// TODO: This fork-choice check is potentially duplicated, review code
|
||||||
|
if !self.chain.block_is_known_to_fork_choice(&remote.head_root) {
|
||||||
|
self.handle_unknown_block_root(peer_id, remote.head_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_sync_state();
|
self.update_sync_state();
|
||||||
@@ -391,6 +426,44 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trigger range sync for a set of peers that claim to have imported a head unknown to us.
|
||||||
|
fn add_peers_force_range_sync(
|
||||||
|
&mut self,
|
||||||
|
peers: &[PeerId],
|
||||||
|
head_root: Hash256,
|
||||||
|
head_slot: Option<Slot>,
|
||||||
|
) {
|
||||||
|
let status = self.chain.status_message();
|
||||||
|
let local = SyncInfo {
|
||||||
|
head_slot: status.head_slot,
|
||||||
|
head_root: status.head_root,
|
||||||
|
finalized_epoch: status.finalized_epoch,
|
||||||
|
finalized_root: status.finalized_root,
|
||||||
|
};
|
||||||
|
|
||||||
|
let head_slot = head_slot.unwrap_or_else(|| {
|
||||||
|
debug!(self.log,
|
||||||
|
"On add peers force range sync assuming local head_slot";
|
||||||
|
"local_head_slot" => local.head_slot,
|
||||||
|
"head_root" => ?head_root
|
||||||
|
);
|
||||||
|
local.head_slot
|
||||||
|
});
|
||||||
|
|
||||||
|
let remote = SyncInfo {
|
||||||
|
head_slot,
|
||||||
|
head_root,
|
||||||
|
// Set finalized to same as local to trigger Head sync
|
||||||
|
finalized_epoch: local.finalized_epoch,
|
||||||
|
finalized_root: local.finalized_root,
|
||||||
|
};
|
||||||
|
|
||||||
|
for peer_id in peers {
|
||||||
|
self.range_sync
|
||||||
|
.add_peer(&mut self.network, local.clone(), *peer_id, remote.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handles RPC errors related to requests that were emitted from the sync manager.
|
/// Handles RPC errors related to requests that were emitted from the sync manager.
|
||||||
fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) {
|
fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) {
|
||||||
trace!(self.log, "Sync manager received a failed RPC");
|
trace!(self.log, "Sync manager received a failed RPC");
|
||||||
@@ -476,8 +549,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Updates the syncing state of a peer.
|
/// Updates the syncing state of a peer.
|
||||||
/// Return whether the peer should be used for range syncing or not, according to its
|
/// Return true if the peer is still connected and known to the peers DB
|
||||||
/// connection status.
|
|
||||||
fn update_peer_sync_state(
|
fn update_peer_sync_state(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: &PeerId,
|
peer_id: &PeerId,
|
||||||
@@ -686,6 +758,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
SyncMessage::AddPeer(peer_id, info) => {
|
SyncMessage::AddPeer(peer_id, info) => {
|
||||||
self.add_peer(peer_id, info);
|
self.add_peer(peer_id, info);
|
||||||
}
|
}
|
||||||
|
SyncMessage::AddPeersForceRangeSync {
|
||||||
|
peers,
|
||||||
|
head_root,
|
||||||
|
head_slot,
|
||||||
|
} => {
|
||||||
|
self.add_peers_force_range_sync(&peers, head_root, head_slot);
|
||||||
|
}
|
||||||
SyncMessage::RpcBlock {
|
SyncMessage::RpcBlock {
|
||||||
request_id,
|
request_id,
|
||||||
peer_id,
|
peer_id,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlock
|
|||||||
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
|
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
|
||||||
use super::manager::BlockProcessType;
|
use super::manager::BlockProcessType;
|
||||||
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
|
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
|
||||||
|
use super::SyncMessage;
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||||
use crate::service::NetworkMessage;
|
use crate::service::NetworkMessage;
|
||||||
@@ -249,6 +250,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_sync_message(&mut self, sync_message: SyncMessage<T::EthSpec>) {
|
||||||
|
self.network_beacon_processor
|
||||||
|
.send_sync_message(sync_message);
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the ids of all the requests made to the given peer_id.
|
/// Returns the ids of all the requests made to the given peer_id.
|
||||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec<SyncRequestId> {
|
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec<SyncRequestId> {
|
||||||
let failed_range_ids =
|
let failed_range_ids =
|
||||||
|
|||||||
@@ -386,6 +386,7 @@ where
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||||
|
use crate::sync::SyncMessage;
|
||||||
use crate::NetworkMessage;
|
use crate::NetworkMessage;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -690,6 +691,7 @@ mod tests {
|
|||||||
log.new(o!("component" => "range")),
|
log.new(o!("component" => "range")),
|
||||||
);
|
);
|
||||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||||
|
let (sync_tx, _sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
|
||||||
let network_config = Arc::new(NetworkConfig::default());
|
let network_config = Arc::new(NetworkConfig::default());
|
||||||
let globals = Arc::new(NetworkGlobals::new_test_globals(
|
let globals = Arc::new(NetworkGlobals::new_test_globals(
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
@@ -700,6 +702,7 @@ mod tests {
|
|||||||
let (network_beacon_processor, beacon_processor_rx) =
|
let (network_beacon_processor, beacon_processor_rx) =
|
||||||
NetworkBeaconProcessor::null_for_testing(
|
NetworkBeaconProcessor::null_for_testing(
|
||||||
globals.clone(),
|
globals.clone(),
|
||||||
|
sync_tx,
|
||||||
chain.clone(),
|
chain.clone(),
|
||||||
harness.runtime.task_executor.clone(),
|
harness.runtime.task_executor.clone(),
|
||||||
log.clone(),
|
log.clone(),
|
||||||
|
|||||||
Reference in New Issue
Block a user