mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 20:57:10 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states
This commit is contained in:
@@ -44,3 +44,8 @@ strum = "0.24.0"
|
||||
tokio-util = { version = "0.6.3", features = ["time"] }
|
||||
derivative = "2.2.0"
|
||||
delay_map = "0.1.1"
|
||||
ethereum-types = { version = "0.12.1", optional = true }
|
||||
|
||||
[features]
|
||||
deterministic_long_lived_attnets = [ "ethereum-types" ]
|
||||
# default = ["deterministic_long_lived_attnets"]
|
||||
|
||||
@@ -489,6 +489,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
/// Create a new `Work` event for some block, where the result from computation (if any) is
|
||||
/// sent to the other side of `result_tx`.
|
||||
pub fn rpc_beacon_block(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
@@ -496,6 +497,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::RpcBlock {
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
@@ -577,6 +579,7 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
},
|
||||
},
|
||||
ReadyWork::RpcBlock(QueuedRpcBlock {
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
@@ -584,6 +587,7 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
}) => Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::RpcBlock {
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
@@ -705,6 +709,7 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
RpcBlock {
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
@@ -1532,11 +1537,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
* Verification for beacon blocks received during syncing via RPC.
|
||||
*/
|
||||
Work::RpcBlock {
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
should_process,
|
||||
} => task_spawner.spawn_async(worker.process_rpc_block(
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
|
||||
@@ -242,6 +242,7 @@ impl TestRig {
|
||||
|
||||
pub fn enqueue_rpc_block(&self) {
|
||||
let event = WorkEvent::rpc_beacon_block(
|
||||
self.next_block.canonical_root(),
|
||||
self.next_block.clone(),
|
||||
std::time::Duration::default(),
|
||||
BlockProcessType::ParentLookup {
|
||||
@@ -253,6 +254,7 @@ impl TestRig {
|
||||
|
||||
pub fn enqueue_single_lookup_rpc_block(&self) {
|
||||
let event = WorkEvent::rpc_beacon_block(
|
||||
self.next_block.canonical_root(),
|
||||
self.next_block.clone(),
|
||||
std::time::Duration::default(),
|
||||
BlockProcessType::SingleBlock { id: 1 },
|
||||
|
||||
@@ -109,6 +109,7 @@ pub struct QueuedGossipBlock<T: BeaconChainTypes> {
|
||||
/// A block that arrived for processing when the same block was being imported over gossip.
|
||||
/// It is queued for later import.
|
||||
pub struct QueuedRpcBlock<T: EthSpec> {
|
||||
pub block_root: Hash256,
|
||||
pub block: Arc<SignedBeaconBlock<T>>,
|
||||
pub process_type: BlockProcessType,
|
||||
pub seen_timestamp: Duration,
|
||||
|
||||
@@ -713,16 +713,28 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
block_delay,
|
||||
);
|
||||
|
||||
let verification_result = self
|
||||
.chain
|
||||
.clone()
|
||||
.verify_block_for_gossip(block.clone())
|
||||
.await;
|
||||
|
||||
let block_root = if let Ok(verified_block) = &verification_result {
|
||||
verified_block.block_root
|
||||
} else {
|
||||
block.canonical_root()
|
||||
};
|
||||
|
||||
// Write the time the block was observed into delay cache.
|
||||
self.chain.block_times_cache.write().set_time_observed(
|
||||
block.canonical_root(),
|
||||
block_root,
|
||||
block.slot(),
|
||||
seen_duration,
|
||||
Some(peer_id.to_string()),
|
||||
Some(peer_client.to_string()),
|
||||
);
|
||||
|
||||
let verified_block = match self.chain.clone().verify_block_for_gossip(block).await {
|
||||
let verified_block = match verification_result {
|
||||
Ok(verified_block) => {
|
||||
if block_delay >= self.chain.slot_clock.unagg_attestation_production_delay() {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_GOSSIP_ARRIVED_LATE_TOTAL);
|
||||
@@ -762,9 +774,9 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
debug!(
|
||||
self.log,
|
||||
"Unknown parent for gossip block";
|
||||
"root" => ?block.canonical_root()
|
||||
"root" => ?block_root
|
||||
);
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::BeaconChainError(_)) => {
|
||||
@@ -918,10 +930,11 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
_seen_duration: Duration,
|
||||
) {
|
||||
let block: Arc<_> = verified_block.block.clone();
|
||||
let block_root = verified_block.block_root;
|
||||
|
||||
match self
|
||||
.chain
|
||||
.process_block(verified_block, CountUnrealized::True)
|
||||
.process_block(block_root, verified_block, CountUnrealized::True)
|
||||
.await
|
||||
{
|
||||
Ok(block_root) => {
|
||||
@@ -956,7 +969,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"Block with unknown parent attempted to be processed";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
|
||||
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
|
||||
}
|
||||
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
|
||||
debug!(
|
||||
@@ -970,7 +983,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
self.log,
|
||||
"Invalid gossip beacon block";
|
||||
"outcome" => ?other,
|
||||
"block root" => ?block.canonical_root(),
|
||||
"block root" => ?block_root,
|
||||
"block slot" => block.slot()
|
||||
);
|
||||
self.gossip_penalize_peer(
|
||||
|
||||
@@ -338,7 +338,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
if blocks_sent < (req.count as usize) {
|
||||
debug!(
|
||||
self.log,
|
||||
"BlocksByRange Response processed";
|
||||
"BlocksByRange outgoing response processed";
|
||||
"peer" => %peer_id,
|
||||
"msg" => "Failed to return all requested blocks",
|
||||
"start_slot" => req.start_slot,
|
||||
@@ -349,7 +349,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"BlocksByRange Response processed";
|
||||
"BlocksByRange outgoing response processed";
|
||||
"peer" => %peer_id,
|
||||
"start_slot" => req.start_slot,
|
||||
"current_slot" => current_slot,
|
||||
|
||||
@@ -38,8 +38,10 @@ struct ChainSegmentFailed {
|
||||
|
||||
impl<T: BeaconChainTypes> Worker<T> {
|
||||
/// Attempt to process a block received from a direct RPC request.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn process_rpc_block(
|
||||
self,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
@@ -56,17 +58,18 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
return;
|
||||
}
|
||||
// Check if the block is already being imported through another source
|
||||
let handle = match duplicate_cache.check_and_insert(block.canonical_root()) {
|
||||
let handle = match duplicate_cache.check_and_insert(block_root) {
|
||||
Some(handle) => handle,
|
||||
None => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Gossip block is being processed";
|
||||
"action" => "sending rpc block to reprocessing queue",
|
||||
"block_root" => %block.canonical_root(),
|
||||
"block_root" => %block_root,
|
||||
);
|
||||
// Send message to work reprocess queue to retry the block
|
||||
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
|
||||
block_root,
|
||||
block: block.clone(),
|
||||
process_type,
|
||||
seen_timestamp,
|
||||
@@ -74,13 +77,16 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
});
|
||||
|
||||
if reprocess_tx.try_send(reprocess_msg).is_err() {
|
||||
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block.canonical_root())
|
||||
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block_root)
|
||||
};
|
||||
return;
|
||||
}
|
||||
};
|
||||
let slot = block.slot();
|
||||
let result = self.chain.process_block(block, CountUnrealized::True).await;
|
||||
let result = self
|
||||
.chain
|
||||
.process_block(block_root, block, CountUnrealized::True)
|
||||
.await;
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
|
||||
@@ -11,17 +11,16 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::channel::mpsc::Sender;
|
||||
use futures::future::OptionFuture;
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::{
|
||||
prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use lighthouse_network::service::Network;
|
||||
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
|
||||
use lighthouse_network::{
|
||||
rpc::{GoodbyeReason, RPCResponseErrorCode},
|
||||
Context, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request,
|
||||
Response, Subnet,
|
||||
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
|
||||
};
|
||||
use lighthouse_network::{
|
||||
types::{GossipEncoding, GossipTopic},
|
||||
BehaviourEvent, MessageId, NetworkGlobals, PeerId,
|
||||
MessageId, NetworkEvent, NetworkGlobals, PeerId,
|
||||
};
|
||||
use slog::{crit, debug, error, info, o, trace, warn};
|
||||
use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
|
||||
@@ -171,7 +170,7 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
||||
/// A reference to the underlying beacon chain.
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
/// The underlying libp2p service that drives all the network interactions.
|
||||
libp2p: LibP2PService<RequestId, T::EthSpec>,
|
||||
libp2p: Network<RequestId, T::EthSpec>,
|
||||
/// An attestation and subnet manager service.
|
||||
attestation_service: AttestationService<T>,
|
||||
/// A sync committeee subnet manager service.
|
||||
@@ -273,8 +272,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
};
|
||||
|
||||
// launch libp2p service
|
||||
let (network_globals, mut libp2p) =
|
||||
LibP2PService::new(executor.clone(), service_context, &network_log).await?;
|
||||
let (mut libp2p, network_globals) =
|
||||
Network::new(executor.clone(), service_context, &network_log).await?;
|
||||
|
||||
// Repopulate the DHT with stored ENR's if discovery is not disabled.
|
||||
if !config.disable_discovery {
|
||||
@@ -284,7 +283,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
"Loading peers into the routing table"; "peers" => enrs_to_load.len()
|
||||
);
|
||||
for enr in enrs_to_load {
|
||||
libp2p.swarm.behaviour_mut().add_enr(enr.clone());
|
||||
libp2p.add_enr(enr.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -300,9 +299,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
)?;
|
||||
|
||||
// attestation subnet service
|
||||
let attestation_service =
|
||||
AttestationService::new(beacon_chain.clone(), config, &network_log);
|
||||
|
||||
let attestation_service = AttestationService::new(
|
||||
beacon_chain.clone(),
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
network_globals.local_enr().node_id().raw().into(),
|
||||
config,
|
||||
&network_log,
|
||||
);
|
||||
// sync committee subnet service
|
||||
let sync_committee_service =
|
||||
SyncCommitteeService::new(beacon_chain.clone(), config, &network_log);
|
||||
@@ -402,7 +405,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
_ = self.metrics_update.tick(), if self.metrics_enabled => {
|
||||
// update various network metrics
|
||||
metrics::update_gossip_metrics::<T::EthSpec>(
|
||||
self.libp2p.swarm.behaviour().gs(),
|
||||
self.libp2p.gossipsub(),
|
||||
&self.network_globals,
|
||||
);
|
||||
// update sync metrics
|
||||
@@ -429,7 +432,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
|
||||
Some(_) = &mut self.next_unsubscribe => {
|
||||
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
|
||||
self.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
|
||||
self.libp2p.unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
|
||||
info!(self.log, "Unsubscribed from old fork topics");
|
||||
self.next_unsubscribe = Box::pin(None.into());
|
||||
}
|
||||
@@ -439,7 +442,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
|
||||
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
|
||||
info!(self.log, "Subscribing to new fork topics");
|
||||
self.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
|
||||
self.libp2p.subscribe_new_fork_topics(fork_digest);
|
||||
self.next_fork_subscriptions = Box::pin(None.into());
|
||||
}
|
||||
else {
|
||||
@@ -456,92 +459,90 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
/// Handle an event received from the network.
|
||||
async fn on_libp2p_event(
|
||||
&mut self,
|
||||
ev: Libp2pEvent<RequestId, T::EthSpec>,
|
||||
ev: NetworkEvent<RequestId, T::EthSpec>,
|
||||
shutdown_sender: &mut Sender<ShutdownReason>,
|
||||
) {
|
||||
match ev {
|
||||
Libp2pEvent::Behaviour(event) => match event {
|
||||
BehaviourEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
self.send_to_router(RouterMessage::PeerDialed(peer_id));
|
||||
}
|
||||
BehaviourEvent::PeerConnectedIncoming(_)
|
||||
| BehaviourEvent::PeerBanned(_)
|
||||
| BehaviourEvent::PeerUnbanned(_) => {
|
||||
// No action required for these events.
|
||||
}
|
||||
BehaviourEvent::PeerDisconnected(peer_id) => {
|
||||
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
|
||||
}
|
||||
BehaviourEvent::RequestReceived {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
self.send_to_router(RouterMessage::PeerDialed(peer_id));
|
||||
}
|
||||
NetworkEvent::PeerConnectedIncoming(_)
|
||||
| NetworkEvent::PeerBanned(_)
|
||||
| NetworkEvent::PeerUnbanned(_) => {
|
||||
// No action required for these events.
|
||||
}
|
||||
NetworkEvent::PeerDisconnected(peer_id) => {
|
||||
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
|
||||
}
|
||||
NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
});
|
||||
}
|
||||
BehaviourEvent::ResponseReceived {
|
||||
});
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id,
|
||||
id,
|
||||
response,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request_id: id,
|
||||
response,
|
||||
} => {
|
||||
self.send_to_router(RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
response,
|
||||
});
|
||||
}
|
||||
BehaviourEvent::RPCFailed { id, peer_id } => {
|
||||
self.send_to_router(RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
});
|
||||
}
|
||||
BehaviourEvent::StatusPeer(peer_id) => {
|
||||
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
||||
}
|
||||
BehaviourEvent::PubsubMessage {
|
||||
id,
|
||||
source,
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
match message {
|
||||
// attestation information gets processed in the attestation service
|
||||
PubsubMessage::Attestation(ref subnet_and_attestation) => {
|
||||
let subnet = subnet_and_attestation.0;
|
||||
let attestation = &subnet_and_attestation.1;
|
||||
// checks if we have an aggregator for the slot. If so, we should process
|
||||
// the attestation, else we just just propagate the Attestation.
|
||||
let should_process = self
|
||||
.attestation_service
|
||||
.should_process_attestation(subnet, attestation);
|
||||
self.send_to_router(RouterMessage::PubsubMessage(
|
||||
id,
|
||||
source,
|
||||
message,
|
||||
should_process,
|
||||
));
|
||||
}
|
||||
_ => {
|
||||
// all else is sent to the router
|
||||
self.send_to_router(RouterMessage::PubsubMessage(
|
||||
id, source, message, true,
|
||||
));
|
||||
}
|
||||
});
|
||||
}
|
||||
NetworkEvent::RPCFailed { id, peer_id } => {
|
||||
self.send_to_router(RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
});
|
||||
}
|
||||
NetworkEvent::StatusPeer(peer_id) => {
|
||||
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
||||
}
|
||||
NetworkEvent::PubsubMessage {
|
||||
id,
|
||||
source,
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
match message {
|
||||
// attestation information gets processed in the attestation service
|
||||
PubsubMessage::Attestation(ref subnet_and_attestation) => {
|
||||
let subnet = subnet_and_attestation.0;
|
||||
let attestation = &subnet_and_attestation.1;
|
||||
// checks if we have an aggregator for the slot. If so, we should process
|
||||
// the attestation, else we just just propagate the Attestation.
|
||||
let should_process = self
|
||||
.attestation_service
|
||||
.should_process_attestation(subnet, attestation);
|
||||
self.send_to_router(RouterMessage::PubsubMessage(
|
||||
id,
|
||||
source,
|
||||
message,
|
||||
should_process,
|
||||
));
|
||||
}
|
||||
_ => {
|
||||
// all else is sent to the router
|
||||
self.send_to_router(RouterMessage::PubsubMessage(
|
||||
id, source, message, true,
|
||||
));
|
||||
}
|
||||
}
|
||||
},
|
||||
Libp2pEvent::NewListenAddr(multiaddr) => {
|
||||
}
|
||||
NetworkEvent::NewListenAddr(multiaddr) => {
|
||||
self.network_globals
|
||||
.listen_multiaddrs
|
||||
.write()
|
||||
.push(multiaddr);
|
||||
}
|
||||
Libp2pEvent::ZeroListeners => {
|
||||
NetworkEvent::ZeroListeners => {
|
||||
let _ = shutdown_sender
|
||||
.send(ShutdownReason::Failure(
|
||||
"All listeners are closed. Unable to listen",
|
||||
@@ -588,7 +589,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
id,
|
||||
reason,
|
||||
} => {
|
||||
self.libp2p.respond_with_error(peer_id, id, error, reason);
|
||||
self.libp2p.send_error_reponse(peer_id, id, error, reason);
|
||||
}
|
||||
NetworkMessage::UPnPMappingEstablished {
|
||||
tcp_socket,
|
||||
@@ -599,8 +600,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
if let Some(tcp_socket) = tcp_socket {
|
||||
if let Err(e) = self
|
||||
.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.discovery_mut()
|
||||
.update_enr_tcp_port(tcp_socket.port())
|
||||
{
|
||||
@@ -613,8 +612,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
if let Some(udp_socket) = udp_socket {
|
||||
if let Err(e) = self
|
||||
.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.discovery_mut()
|
||||
.update_enr_udp_socket(udp_socket)
|
||||
{
|
||||
@@ -633,14 +630,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
"message_id" => %message_id,
|
||||
"validation_result" => ?validation_result
|
||||
);
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.report_message_validation_result(
|
||||
&propagation_source,
|
||||
message_id,
|
||||
validation_result,
|
||||
);
|
||||
self.libp2p.report_message_validation_result(
|
||||
&propagation_source,
|
||||
message_id,
|
||||
validation_result,
|
||||
);
|
||||
}
|
||||
NetworkMessage::Publish { messages } => {
|
||||
let mut topic_kinds = Vec::new();
|
||||
@@ -655,7 +649,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
"count" => messages.len(),
|
||||
"topics" => ?topic_kinds
|
||||
);
|
||||
self.libp2p.swarm.behaviour_mut().publish(messages);
|
||||
self.libp2p.publish(messages);
|
||||
}
|
||||
NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
@@ -693,7 +687,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
GossipEncoding::default(),
|
||||
fork_digest,
|
||||
);
|
||||
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
|
||||
if self.libp2p.subscribe(topic.clone()) {
|
||||
subscribed_topics.push(topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
||||
@@ -706,10 +700,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
|
||||
// Update the ENR bitfield
|
||||
self.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
|
||||
if self.libp2p.subscribe(topic.clone()) {
|
||||
subscribed_topics.push(topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
||||
@@ -720,17 +714,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
for subnet_id in 0..subnet_max {
|
||||
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
|
||||
// Update the ENR bitfield
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_enr_subnet(subnet, true);
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic = GossipTopic::new(
|
||||
subnet.into(),
|
||||
GossipEncoding::default(),
|
||||
fork_digest,
|
||||
);
|
||||
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
|
||||
if self.libp2p.subscribe(topic.clone()) {
|
||||
subscribed_topics.push(topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
||||
@@ -782,8 +773,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
if let Some(active_validators) = active_validators_opt {
|
||||
if self
|
||||
.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_gossipsub_parameters(active_validators, slot)
|
||||
.is_err()
|
||||
{
|
||||
@@ -811,33 +800,24 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic =
|
||||
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
self.libp2p.swarm.behaviour_mut().subscribe(topic);
|
||||
self.libp2p.subscribe(topic);
|
||||
}
|
||||
}
|
||||
SubnetServiceMessage::Unsubscribe(subnet) => {
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic =
|
||||
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
self.libp2p.swarm.behaviour_mut().unsubscribe(topic);
|
||||
self.libp2p.unsubscribe(topic);
|
||||
}
|
||||
}
|
||||
SubnetServiceMessage::EnrAdd(subnet) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_enr_subnet(subnet, true);
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
}
|
||||
SubnetServiceMessage::EnrRemove(subnet) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_enr_subnet(subnet, false);
|
||||
self.libp2p.update_enr_subnet(subnet, false);
|
||||
}
|
||||
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.discover_subnet_peers(subnets_to_discover);
|
||||
self.libp2p.discover_subnet_peers(subnets_to_discover);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -848,33 +828,24 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic =
|
||||
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
self.libp2p.swarm.behaviour_mut().subscribe(topic);
|
||||
self.libp2p.subscribe(topic);
|
||||
}
|
||||
}
|
||||
SubnetServiceMessage::Unsubscribe(subnet) => {
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic =
|
||||
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
self.libp2p.swarm.behaviour_mut().unsubscribe(topic);
|
||||
self.libp2p.unsubscribe(topic);
|
||||
}
|
||||
}
|
||||
SubnetServiceMessage::EnrAdd(subnet) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_enr_subnet(subnet, true);
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
}
|
||||
SubnetServiceMessage::EnrRemove(subnet) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_enr_subnet(subnet, false);
|
||||
self.libp2p.update_enr_subnet(subnet, false);
|
||||
}
|
||||
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.discover_subnet_peers(subnets_to_discover);
|
||||
self.libp2p.discover_subnet_peers(subnets_to_discover);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -892,10 +863,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
);
|
||||
fork_context.update_current_fork(*new_fork_name);
|
||||
|
||||
self.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_fork_version(new_enr_fork_id);
|
||||
self.libp2p.update_fork_version(new_enr_fork_id);
|
||||
// Reinitialize the next_fork_update
|
||||
self.next_fork_update = Box::pin(next_fork_delay(&self.beacon_chain).into());
|
||||
|
||||
@@ -944,7 +912,7 @@ fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
|
||||
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
||||
fn drop(&mut self) {
|
||||
// network thread is terminating
|
||||
let enrs = self.libp2p.swarm.behaviour_mut().enr_entries();
|
||||
let enrs = self.libp2p.enr_entries();
|
||||
debug!(
|
||||
self.log,
|
||||
"Persisting DHT to store";
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! determines whether attestations should be aggregated and/or passed to the beacon node.
|
||||
|
||||
use super::SubnetServiceMessage;
|
||||
#[cfg(test)]
|
||||
#[cfg(any(test, feature = "deterministic_long_lived_attnets"))]
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::pin::Pin;
|
||||
@@ -15,6 +15,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use delay_map::{HashMapDelay, HashSetDelay};
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
use rand::seq::SliceRandom;
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use slot_clock::SlotClock;
|
||||
@@ -28,6 +29,7 @@ use crate::metrics;
|
||||
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
|
||||
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from
|
||||
/// the random gossip topics that we subscribed to due to the validator connection.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150;
|
||||
/// The fraction of a slot that we subscribe to a subnet before the required slot.
|
||||
///
|
||||
@@ -70,6 +72,9 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// Subnets we are currently subscribed to as long lived subscriptions.
|
||||
///
|
||||
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
long_lived_subscriptions: HashSet<SubnetId>,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
long_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
|
||||
|
||||
/// Short lived subscriptions that need to be done in the future.
|
||||
@@ -83,6 +88,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update
|
||||
/// our ENR.
|
||||
/// This is a set of validator indices.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
known_validators: HashSetDelay<u64>,
|
||||
|
||||
/// The waker for the current thread.
|
||||
@@ -95,8 +101,17 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
subscribe_all_subnets: bool,
|
||||
|
||||
/// For how many slots we subscribe to long lived subnets.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
long_lived_subnet_subscription_slots: u64,
|
||||
|
||||
/// Our Discv5 node_id.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
node_id: ethereum_types::U256,
|
||||
|
||||
/// Future used to manage subscribing and unsubscribing from long lived subnets.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
|
||||
|
||||
/// The logger for the attestation service.
|
||||
log: slog::Logger,
|
||||
}
|
||||
@@ -104,6 +119,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/* Public functions */
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
pub fn new(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
config: &NetworkConfig,
|
||||
@@ -145,31 +161,85 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
pub fn new(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
node_id: ethereum_types::U256,
|
||||
config: &NetworkConfig,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let log = log.new(o!("service" => "attestation_service"));
|
||||
|
||||
// Calculate the random subnet duration from the spec constants.
|
||||
let slot_duration = beacon_chain.slot_clock.slot_duration();
|
||||
|
||||
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node);
|
||||
|
||||
let track_validators = !config.import_all_attestations;
|
||||
let aggregate_validators_on_subnet =
|
||||
track_validators.then(|| HashSetDelay::new(slot_duration));
|
||||
let mut service = AttestationService {
|
||||
events: VecDeque::with_capacity(10),
|
||||
beacon_chain,
|
||||
short_lived_subscriptions: HashMapDelay::new(slot_duration),
|
||||
long_lived_subscriptions: HashSet::default(),
|
||||
scheduled_short_lived_subscriptions: HashSetDelay::default(),
|
||||
aggregate_validators_on_subnet,
|
||||
waker: None,
|
||||
discovery_disabled: config.disable_discovery,
|
||||
subscribe_all_subnets: config.subscribe_all_subnets,
|
||||
node_id,
|
||||
next_long_lived_subscription_event: {
|
||||
// Set a dummy sleep. Calculating the current subnet subscriptions will update this
|
||||
// value with a smarter timing
|
||||
Box::pin(tokio::time::sleep(Duration::from_secs(1)))
|
||||
},
|
||||
log,
|
||||
};
|
||||
service.recompute_long_lived_subnets();
|
||||
service
|
||||
}
|
||||
|
||||
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
|
||||
#[cfg(test)]
|
||||
pub fn subscription_count(&self) -> usize {
|
||||
if self.subscribe_all_subnets {
|
||||
self.beacon_chain.spec.attestation_subnet_count as usize
|
||||
} else {
|
||||
self.short_lived_subscriptions
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let count = self
|
||||
.short_lived_subscriptions
|
||||
.keys()
|
||||
.chain(self.long_lived_subscriptions.iter())
|
||||
.collect::<HashSet<_>>()
|
||||
.len();
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
let count = self
|
||||
.short_lived_subscriptions
|
||||
.keys()
|
||||
.chain(self.long_lived_subscriptions.keys())
|
||||
.collect::<HashSet<_>>()
|
||||
.len()
|
||||
.len();
|
||||
count
|
||||
}
|
||||
}
|
||||
|
||||
/// Give access to the current subscriptions for testing purposes.
|
||||
/// Returns whether we are subscribed to a subnet for testing purposes.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn subscriptions(
|
||||
pub(crate) fn is_subscribed(
|
||||
&self,
|
||||
subnet_id: &SubnetId,
|
||||
subscription_kind: SubscriptionKind,
|
||||
) -> &HashMapDelay<SubnetId, Slot> {
|
||||
) -> bool {
|
||||
match subscription_kind {
|
||||
SubscriptionKind::LongLived => &self.long_lived_subscriptions,
|
||||
SubscriptionKind::ShortLived => &self.short_lived_subscriptions,
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id),
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id),
|
||||
SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a list of validator subscriptions.
|
||||
///
|
||||
/// This will:
|
||||
@@ -197,6 +267,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
"Validator subscription";
|
||||
"subscription" => ?subscription,
|
||||
);
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
self.add_known_validator(subscription.validator_index);
|
||||
|
||||
let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>(
|
||||
@@ -267,6 +338,111 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn recompute_long_lived_subnets(&mut self) {
|
||||
// Ensure the next computation is scheduled even if assigning subnets fails.
|
||||
let next_subscription_event = self
|
||||
.recompute_long_lived_subnets_inner()
|
||||
.unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration());
|
||||
|
||||
debug!(self.log, "Recomputing deterministic long lived attnets");
|
||||
self.next_long_lived_subscription_event =
|
||||
Box::pin(tokio::time::sleep(next_subscription_event));
|
||||
|
||||
if let Some(waker) = self.waker.as_ref() {
|
||||
waker.wake_by_ref();
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the long lived subnets the node should be subscribed to during the current epoch and
|
||||
/// the remaining duration for which they remain valid.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> {
|
||||
let current_epoch = self.beacon_chain.epoch().map_err(
|
||||
|e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e),
|
||||
)?;
|
||||
|
||||
let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>(
|
||||
self.node_id,
|
||||
current_epoch,
|
||||
&self.beacon_chain.spec,
|
||||
)
|
||||
.map_err(|e| error!(self.log, "Could not compute subnets for current epoch"; "err" => e))?;
|
||||
|
||||
let next_subscription_slot =
|
||||
next_subscription_epoch.start_slot(T::EthSpec::slots_per_epoch());
|
||||
let next_subscription_event = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(next_subscription_slot)
|
||||
.ok_or_else(|| {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to compute duration to next to long lived subscription event"
|
||||
)
|
||||
})?;
|
||||
|
||||
self.update_long_lived_subnets(subnets.collect());
|
||||
|
||||
Ok(next_subscription_event)
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
|
||||
pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet<SubnetId>) {
|
||||
self.update_long_lived_subnets(subnets)
|
||||
}
|
||||
|
||||
/// Updates the long lived subnets.
|
||||
///
|
||||
/// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr
|
||||
/// updated accordingly.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) {
|
||||
for subnet in &subnets {
|
||||
// Add the events for those subnets that are new as long lived subscriptions.
|
||||
if !self.long_lived_subscriptions.contains(subnet) {
|
||||
// Check if this subnet is new and send the subscription event if needed.
|
||||
if !self.short_lived_subscriptions.contains_key(subnet) {
|
||||
debug!(self.log, "Subscribing to subnet";
|
||||
"subnet" => ?subnet,
|
||||
"subscription_kind" => ?SubscriptionKind::LongLived,
|
||||
);
|
||||
self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation(
|
||||
*subnet,
|
||||
)));
|
||||
}
|
||||
self.queue_event(SubnetServiceMessage::EnrAdd(Subnet::Attestation(*subnet)));
|
||||
if !self.discovery_disabled {
|
||||
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet: Subnet::Attestation(*subnet),
|
||||
min_ttl: None,
|
||||
}]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for subnets that are being removed
|
||||
std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets);
|
||||
for subnet in subnets {
|
||||
if !self.long_lived_subscriptions.contains(&subnet) {
|
||||
if !self.short_lived_subscriptions.contains_key(&subnet) {
|
||||
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived);
|
||||
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
|
||||
subnet,
|
||||
)));
|
||||
}
|
||||
|
||||
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Overwrites the long lived subscriptions for testing.
|
||||
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
|
||||
pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet<SubnetId>) {
|
||||
self.long_lived_subscriptions = subnets
|
||||
}
|
||||
|
||||
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
|
||||
/// verification, re-propagates and returns false.
|
||||
pub fn should_process_attestation(
|
||||
@@ -377,6 +553,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// This is a current or past slot, we subscribe immediately.
|
||||
self.subscribe_to_subnet_immediately(
|
||||
subnet_id,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived,
|
||||
slot + 1,
|
||||
)?;
|
||||
@@ -391,6 +568,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
|
||||
/// Updates the `known_validators` mapping and subscribes to long lived subnets if required.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn add_known_validator(&mut self, validator_index: u64) {
|
||||
let previously_known = self.known_validators.contains_key(&validator_index);
|
||||
// Add the new validator or update the current timeout for a known validator.
|
||||
@@ -405,6 +583,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// Subscribe to long-lived random subnets and update the local ENR bitfield.
|
||||
/// The number of subnets to subscribe depends on the number of active validators and number of
|
||||
/// current subscriptions.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn subscribe_to_random_subnets(&mut self) {
|
||||
if self.subscribe_all_subnets {
|
||||
// This case is not handled by this service.
|
||||
@@ -468,9 +647,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// Checks that the time in which the subscription would end is not in the past. If we are
|
||||
/// already subscribed, extends the timeout if necessary. If this is a new subscription, we send
|
||||
/// out the appropriate events.
|
||||
///
|
||||
/// On determinist long lived subnets, this is only used for short lived subscriptions.
|
||||
fn subscribe_to_subnet_immediately(
|
||||
&mut self,
|
||||
subnet_id: SubnetId,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
subscription_kind: SubscriptionKind,
|
||||
end_slot: Slot,
|
||||
) -> Result<(), &'static str> {
|
||||
@@ -490,9 +672,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
return Err("Time when subscription would end has already passed.");
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let subscription_kind = SubscriptionKind::ShortLived;
|
||||
|
||||
// We need to check and add a subscription for the right kind, regardless of the presence
|
||||
// of the subnet as a subscription of the other kind. This is mainly since long lived
|
||||
// subscriptions can be removed at any time when a validator goes offline.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind {
|
||||
SubscriptionKind::ShortLived => (
|
||||
&mut self.short_lived_subscriptions,
|
||||
@@ -504,6 +690,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
),
|
||||
};
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let (subscriptions, already_subscribed_as_other_kind) = (
|
||||
&mut self.short_lived_subscriptions,
|
||||
self.long_lived_subscriptions.contains(&subnet_id),
|
||||
);
|
||||
|
||||
match subscriptions.get(&subnet_id) {
|
||||
Some(current_end_slot) => {
|
||||
// We are already subscribed. Check if we need to extend the subscription.
|
||||
@@ -535,6 +727,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
|
||||
// If this is a new long lived subscription, send out the appropriate events.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
if SubscriptionKind::LongLived == subscription_kind {
|
||||
let subnet = Subnet::Attestation(subnet_id);
|
||||
// Advertise this subnet in our ENR.
|
||||
@@ -564,26 +757,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
///
|
||||
/// This function selects a new subnet to join, or extends the expiry if there are no more
|
||||
/// available subnets to choose from.
|
||||
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId, end_slot: Slot) {
|
||||
let subnet_count = self.beacon_chain.spec.attestation_subnet_count;
|
||||
if self.long_lived_subscriptions.len() == (subnet_count - 1) as usize {
|
||||
let end_slot = end_slot + self.long_lived_subnet_subscription_slots;
|
||||
// This is just an extra accuracy precaution, we could use the default timeout if
|
||||
// needed.
|
||||
if let Some(time_to_subscription_end) =
|
||||
self.beacon_chain.slot_clock.duration_to_slot(end_slot)
|
||||
{
|
||||
// We are at capacity, simply increase the timeout of the current subnet.
|
||||
self.long_lived_subscriptions.insert_at(
|
||||
subnet_id,
|
||||
end_slot + 1,
|
||||
time_to_subscription_end,
|
||||
);
|
||||
} else {
|
||||
self.long_lived_subscriptions.insert(subnet_id, end_slot);
|
||||
}
|
||||
return;
|
||||
}
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) {
|
||||
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
|
||||
|
||||
// Remove the ENR bitfield bit and choose a new random on from the available subnets
|
||||
// Subscribe to a new random subnet.
|
||||
@@ -594,12 +770,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// subscription of the other kind. For long lived subscriptions, it also removes the
|
||||
// advertisement from our ENR.
|
||||
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
|
||||
let other_subscriptions = match subscription_kind {
|
||||
SubscriptionKind::LongLived => &self.short_lived_subscriptions,
|
||||
SubscriptionKind::ShortLived => &self.long_lived_subscriptions,
|
||||
let exists_in_other_subscriptions = match subscription_kind {
|
||||
SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id),
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id),
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id),
|
||||
};
|
||||
|
||||
if !other_subscriptions.contains_key(&subnet_id) {
|
||||
if !exists_in_other_subscriptions {
|
||||
// Subscription no longer exists as short lived or long lived.
|
||||
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind);
|
||||
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
|
||||
@@ -621,6 +800,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// We don't keep track of a specific validator to random subnet, rather the ratio of active
|
||||
/// validators to random subnets. So when a validator goes offline, we can simply remove the
|
||||
/// allocated amount of random subnets.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn handle_known_validator_expiry(&mut self) {
|
||||
// Calculate how many subnets should we remove.
|
||||
let extra_subnet_count = {
|
||||
@@ -677,6 +857,7 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
|
||||
// Process first any known validator expiries, since these affect how many long lived
|
||||
// subnets we need.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
match self.known_validators.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(_validator_index))) => {
|
||||
self.handle_known_validator_expiry();
|
||||
@@ -687,12 +868,19 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => self.recompute_long_lived_subnets(),
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
// Process scheduled subscriptions that might be ready, since those can extend a soon to
|
||||
// expire subscription.
|
||||
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(
|
||||
subnet_id,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived,
|
||||
slot + 1,
|
||||
) {
|
||||
@@ -717,9 +905,10 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
}
|
||||
|
||||
// Process any random subnet expiries.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
match self.long_lived_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok((subnet_id, end_slot)))) => {
|
||||
self.handle_random_subnet_expiry(subnet_id, end_slot)
|
||||
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
|
||||
self.handle_random_subnet_expiry(subnet_id)
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
|
||||
|
||||
@@ -123,7 +123,15 @@ fn get_attestation_service(
|
||||
|
||||
let beacon_chain = CHAIN.chain.clone();
|
||||
|
||||
AttestationService::new(beacon_chain, &config, &log)
|
||||
AttestationService::new(
|
||||
beacon_chain,
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
lighthouse_network::discv5::enr::NodeId::random()
|
||||
.raw()
|
||||
.into(),
|
||||
&config,
|
||||
&log,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> {
|
||||
@@ -170,6 +178,9 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
|
||||
|
||||
mod attestation_service {
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
|
||||
|
||||
use super::*;
|
||||
@@ -190,6 +201,7 @@ mod attestation_service {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn get_subscriptions(
|
||||
validator_count: u64,
|
||||
slot: Slot,
|
||||
@@ -268,8 +280,7 @@ mod attestation_service {
|
||||
// If the long lived and short lived subnets are the same, there should be no more events
|
||||
// as we don't resubscribe already subscribed subnets.
|
||||
if !attestation_service
|
||||
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
|
||||
.contains_key(&subnet_id)
|
||||
.is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived)
|
||||
{
|
||||
assert_eq!(expected[..], events[3..]);
|
||||
}
|
||||
@@ -352,11 +363,12 @@ mod attestation_service {
|
||||
|
||||
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
|
||||
|
||||
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different.
|
||||
if !attestation_service
|
||||
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
|
||||
.contains_key(&subnet_id1)
|
||||
{
|
||||
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are
|
||||
// different.
|
||||
if !attestation_service.is_subscribed(
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(expected, events[3]);
|
||||
assert_eq!(attestation_service.subscription_count(), 2);
|
||||
} else {
|
||||
@@ -366,11 +378,12 @@ mod attestation_service {
|
||||
// Get event for 1 more slot duration, we should get the unsubscribe event now.
|
||||
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
|
||||
|
||||
// If the long lived and short lived subnets are different, we should get an unsubscription event.
|
||||
if !attestation_service
|
||||
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
|
||||
.contains_key(&subnet_id1)
|
||||
{
|
||||
// If the long lived and short lived subnets are different, we should get an unsubscription
|
||||
// event.
|
||||
if !attestation_service.is_subscribed(
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(
|
||||
[SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
|
||||
subnet_id1
|
||||
@@ -383,6 +396,7 @@ mod attestation_service {
|
||||
assert_eq!(attestation_service.subscription_count(), 1);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
#[tokio::test]
|
||||
async fn subscribe_all_random_subnets() {
|
||||
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
|
||||
@@ -440,6 +454,7 @@ mod attestation_service {
|
||||
// test completed successfully
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
#[tokio::test]
|
||||
async fn subscribe_all_random_subnets_plus_one() {
|
||||
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
|
||||
@@ -573,10 +588,10 @@ mod attestation_service {
|
||||
let expected_unsubscription =
|
||||
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
|
||||
|
||||
if !attestation_service
|
||||
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
|
||||
.contains_key(&subnet_id1)
|
||||
{
|
||||
if !attestation_service.is_subscribed(
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(expected_subscription, events[3]);
|
||||
// fourth is a discovery event
|
||||
assert_eq!(expected_unsubscription, events[5]);
|
||||
@@ -600,10 +615,10 @@ mod attestation_service {
|
||||
|
||||
let second_subscribe_event = get_events(&mut attestation_service, None, 2).await;
|
||||
// If the long lived and short lived subnets are different, we should get an unsubscription event.
|
||||
if !attestation_service
|
||||
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
|
||||
.contains_key(&subnet_id1)
|
||||
{
|
||||
if !attestation_service.is_subscribed(
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(
|
||||
[SubnetServiceMessage::Subscribe(Subnet::Attestation(
|
||||
subnet_id1
|
||||
@@ -612,6 +627,43 @@ mod attestation_service {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
async fn test_update_deterministic_long_lived_subnets() {
|
||||
let mut attestation_service = get_attestation_service(None);
|
||||
let new_subnet = SubnetId::new(1);
|
||||
let maintained_subnet = SubnetId::new(2);
|
||||
let removed_subnet = SubnetId::new(3);
|
||||
|
||||
attestation_service
|
||||
.set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet]));
|
||||
// clear initial events
|
||||
let _events = get_events(&mut attestation_service, None, 1).await;
|
||||
|
||||
attestation_service
|
||||
.update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet]));
|
||||
|
||||
let events = get_events(&mut attestation_service, None, 1).await;
|
||||
let new_subnet = Subnet::Attestation(new_subnet);
|
||||
let removed_subnet = Subnet::Attestation(removed_subnet);
|
||||
assert_eq!(
|
||||
events,
|
||||
[
|
||||
// events for the new subnet
|
||||
SubnetServiceMessage::Subscribe(new_subnet),
|
||||
SubnetServiceMessage::EnrAdd(new_subnet),
|
||||
SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet: new_subnet,
|
||||
min_ttl: None
|
||||
}]),
|
||||
// events for the removed subnet
|
||||
SubnetServiceMessage::Unsubscribe(removed_subnet),
|
||||
SubnetServiceMessage::EnrRemove(removed_subnet),
|
||||
]
|
||||
);
|
||||
println!("{events:?}")
|
||||
}
|
||||
}
|
||||
|
||||
mod sync_committee_service {
|
||||
|
||||
@@ -30,6 +30,8 @@ mod single_block_lookup;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
|
||||
|
||||
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
|
||||
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
||||
|
||||
@@ -101,11 +103,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
/// called in order to find the block's parent.
|
||||
pub fn search_parent(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let block_root = block.canonical_root();
|
||||
let parent_root = block.parent_root();
|
||||
// If this block or it's parent is part of a known failed chain, ignore it.
|
||||
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
|
||||
@@ -125,7 +127,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
return;
|
||||
}
|
||||
|
||||
let parent_lookup = ParentLookup::new(block, peer_id);
|
||||
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
|
||||
self.request_parent(parent_lookup, cx);
|
||||
}
|
||||
|
||||
@@ -153,10 +155,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
};
|
||||
|
||||
match request.get_mut().verify_block(block) {
|
||||
Ok(Some(block)) => {
|
||||
Ok(Some((block_root, block))) => {
|
||||
// This is the correct block, send it for processing
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
@@ -217,11 +220,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
};
|
||||
|
||||
match parent_lookup.verify_block(block, &mut self.failed_chains) {
|
||||
Ok(Some(block)) => {
|
||||
Ok(Some((block_root, block))) => {
|
||||
// Block is correct, send to the beacon processor.
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
@@ -420,7 +424,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
|
||||
}
|
||||
BlockError::ParentUnknown(block) => {
|
||||
self.search_parent(block, peer_id, cx);
|
||||
self.search_parent(root, block, peer_id, cx);
|
||||
}
|
||||
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
|
||||
// These errors indicate that the execution layer is offline
|
||||
@@ -625,6 +629,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
fn send_block_for_processing(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
duration: Duration,
|
||||
process_type: BlockProcessType,
|
||||
@@ -632,8 +637,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
) -> Result<(), ()> {
|
||||
match cx.processor_channel_if_enabled() {
|
||||
Some(beacon_processor_send) => {
|
||||
trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type);
|
||||
let event = WorkEvent::rpc_beacon_block(block, duration, process_type);
|
||||
trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type);
|
||||
let event = WorkEvent::rpc_beacon_block(block_root, block, duration, process_type);
|
||||
if let Err(e) = beacon_processor_send.try_send(event) {
|
||||
error!(
|
||||
self.log,
|
||||
@@ -646,7 +651,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block.canonical_root());
|
||||
trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block_root);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::RootBlockTuple;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use lighthouse_network::PeerId;
|
||||
use std::sync::Arc;
|
||||
@@ -58,11 +59,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
.any(|d_block| d_block.as_ref() == block)
|
||||
}
|
||||
|
||||
pub fn new(block: Arc<SignedBeaconBlock<T::EthSpec>>, peer_id: PeerId) -> Self {
|
||||
pub fn new(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
peer_id: PeerId,
|
||||
) -> Self {
|
||||
let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id);
|
||||
|
||||
Self {
|
||||
chain_hash: block.canonical_root(),
|
||||
chain_hash: block_root,
|
||||
downloaded_blocks: vec![block],
|
||||
current_parent_request,
|
||||
current_parent_request_id: None,
|
||||
@@ -130,12 +135,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
&mut self,
|
||||
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
|
||||
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, VerifyError> {
|
||||
let block = self.current_parent_request.verify_block(block)?;
|
||||
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> {
|
||||
let root_and_block = self.current_parent_request.verify_block(block)?;
|
||||
|
||||
// check if the parent of this block isn't in the failed cache. If it is, this chain should
|
||||
// be dropped and the peer downscored.
|
||||
if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) {
|
||||
if let Some(parent_root) = root_and_block
|
||||
.as_ref()
|
||||
.map(|(_, block)| block.parent_root())
|
||||
{
|
||||
if failed_chains.contains(&parent_root) {
|
||||
self.current_parent_request.register_failure_downloading();
|
||||
self.current_parent_request_id = None;
|
||||
@@ -143,7 +151,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(block)
|
||||
Ok(root_and_block)
|
||||
}
|
||||
|
||||
pub fn get_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::RootBlockTuple;
|
||||
use beacon_chain::get_block_root;
|
||||
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
|
||||
use rand::seq::IteratorRandom;
|
||||
use ssz_types::VariableList;
|
||||
@@ -104,7 +106,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
|
||||
pub fn verify_block<T: EthSpec>(
|
||||
&mut self,
|
||||
block: Option<Arc<SignedBeaconBlock<T>>>,
|
||||
) -> Result<Option<Arc<SignedBeaconBlock<T>>>, VerifyError> {
|
||||
) -> Result<Option<RootBlockTuple<T>>, VerifyError> {
|
||||
match self.state {
|
||||
State::AwaitingDownload => {
|
||||
self.register_failure_downloading();
|
||||
@@ -112,7 +114,10 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
|
||||
}
|
||||
State::Downloading { peer_id } => match block {
|
||||
Some(block) => {
|
||||
if block.canonical_root() != self.hash {
|
||||
// Compute the block root using this specific function so that we can get timing
|
||||
// metrics.
|
||||
let block_root = get_block_root(&block);
|
||||
if block_root != self.hash {
|
||||
// return an error and drop the block
|
||||
// NOTE: we take this is as a download failure to prevent counting the
|
||||
// attempt as a chain failure, but simply a peer failure.
|
||||
@@ -121,7 +126,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
|
||||
} else {
|
||||
// Return the block for processing.
|
||||
self.state = State::Processing { peer_id };
|
||||
Ok(Some(block))
|
||||
Ok(Some((block_root, block)))
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -272,7 +272,7 @@ fn test_parent_lookup_happy_path() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
let id = rig.expect_parent_request();
|
||||
|
||||
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
|
||||
@@ -300,7 +300,7 @@ fn test_parent_lookup_wrong_response() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// Peer sends the wrong block, peer should be penalized and the block re-requested.
|
||||
@@ -337,7 +337,7 @@ fn test_parent_lookup_empty_response() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// Peer sends an empty response, peer should be penalized and the block re-requested.
|
||||
@@ -369,7 +369,7 @@ fn test_parent_lookup_rpc_failure() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
let id1 = rig.expect_parent_request();
|
||||
|
||||
// The request fails. It should be tried again.
|
||||
@@ -396,10 +396,11 @@ fn test_parent_lookup_too_many_attempts() {
|
||||
|
||||
let parent = rig.rand_block();
|
||||
let block = rig.block_with_parent(parent.canonical_root());
|
||||
let chain_hash = block.canonical_root();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
|
||||
let id = rig.expect_parent_request();
|
||||
match i % 2 {
|
||||
@@ -435,7 +436,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(block_hash, Arc::new(block), peer_id, &mut cx);
|
||||
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
|
||||
assert!(!bl.failed_chains.contains(&block_hash));
|
||||
let id = rig.expect_parent_request();
|
||||
@@ -469,7 +470,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(block_hash, Arc::new(block), peer_id, &mut cx);
|
||||
|
||||
// Fail downloading the block
|
||||
for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
|
||||
@@ -510,7 +511,7 @@ fn test_parent_lookup_too_deep() {
|
||||
let peer_id = PeerId::random();
|
||||
let trigger_block = blocks.pop().unwrap();
|
||||
let chain_hash = trigger_block.canonical_root();
|
||||
bl.search_parent(Arc::new(trigger_block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(trigger_block), peer_id, &mut cx);
|
||||
|
||||
for block in blocks.into_iter().rev() {
|
||||
let id = rig.expect_parent_request();
|
||||
@@ -537,7 +538,12 @@ fn test_parent_lookup_disconnection() {
|
||||
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
|
||||
let peer_id = PeerId::random();
|
||||
let trigger_block = rig.rand_block();
|
||||
bl.search_parent(Arc::new(trigger_block), peer_id, &mut cx);
|
||||
bl.search_parent(
|
||||
trigger_block.canonical_root(),
|
||||
Arc::new(trigger_block),
|
||||
peer_id,
|
||||
&mut cx,
|
||||
);
|
||||
bl.peer_disconnected(&peer_id, &mut cx);
|
||||
assert!(bl.parent_queue.is_empty());
|
||||
}
|
||||
@@ -581,7 +587,7 @@ fn test_parent_lookup_ignored_response() {
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
// Trigger the request
|
||||
bl.search_parent(Arc::new(block), peer_id, &mut cx);
|
||||
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
|
||||
let id = rig.expect_parent_request();
|
||||
|
||||
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
|
||||
|
||||
@@ -94,7 +94,7 @@ pub enum SyncMessage<T: EthSpec> {
|
||||
},
|
||||
|
||||
/// A block with an unknown parent has been received.
|
||||
UnknownBlock(PeerId, Arc<SignedBeaconBlock<T>>),
|
||||
UnknownBlock(PeerId, Arc<SignedBeaconBlock<T>>, Hash256),
|
||||
|
||||
/// A peer has sent an object that references a block that is unknown. This triggers the
|
||||
/// manager to attempt to find the block matching the unknown hash.
|
||||
@@ -503,7 +503,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
} => {
|
||||
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
|
||||
}
|
||||
SyncMessage::UnknownBlock(peer_id, block) => {
|
||||
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
|
||||
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
|
||||
if !self.network_globals.sync_state.read().is_synced() {
|
||||
let head_slot = self.chain.canonical_head.cached_head().head_slot();
|
||||
@@ -523,7 +523,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
&& self.network.is_execution_engine_online()
|
||||
{
|
||||
self.block_lookups
|
||||
.search_parent(block, peer_id, &mut self.network);
|
||||
.search_parent(block_root, block, peer_id, &mut self.network);
|
||||
}
|
||||
}
|
||||
SyncMessage::UnknownBlockHash(peer_id, block_hash) => {
|
||||
|
||||
Reference in New Issue
Block a user