Merge remote-tracking branch 'sigp/unstable' into electra_attestation_changes

This commit is contained in:
realbigsean
2024-05-06 17:26:43 -04:00
22 changed files with 404 additions and 228 deletions

View File

@@ -307,11 +307,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
@@ -319,37 +315,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
self.active_requests.remove(peer_id);
// Remove the peer from the participation list
self.participating_peers.remove(peer_id);

View File

@@ -1,9 +1,7 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
@@ -26,11 +24,6 @@ pub enum ResponseType {
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
/// Wrapper around bool to prevent mixing this argument with `BlockIsProcessed`
pub(crate) struct AwaitingParent(pub bool);
/// Wrapper around bool to prevent mixing this argument with `AwaitingParent`
pub(crate) struct BlockIsProcessed(pub bool);
/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
@@ -43,52 +36,6 @@ pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
type VerifiedResponseType: Clone;
/// Potentially makes progress on this request if it's in a progress-able state
fn continue_request(
&mut self,
id: Id,
awaiting_parent: AwaitingParent,
downloaded_block_expected_blobs: Option<usize>,
block_is_processed: BlockIsProcessed,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}
let peer_id = self
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;
// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent.0
&& (block_is_processed.0 || matches!(Self::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = self.get_state_mut().maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
}
}
Ok(())
}
/// Request the network context to prepare a request of a component of `block_root`. If the
/// request is not necessary because the component is already known / processed, return false.
/// Return true if it sent a request and we can expect an event back from the network.

View File

@@ -16,7 +16,7 @@ use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
use slog::{debug, error, trace, warn, Logger};
use slog::{debug, error, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
@@ -233,13 +233,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.iter_mut()
.find(|(_id, lookup)| lookup.is_for_block(block_root))
{
trace!(self.log, "Adding peer to existing single block lookup"; "block_root" => %block_root);
lookup.add_peers(peers);
for peer in peers {
if lookup.add_peer(*peer) {
debug!(self.log, "Adding peer to existing single block lookup"; "block_root" => ?block_root, "peer" => ?peer);
}
}
if let Some(block_component) = block_component {
let component_type = block_component.get_type();
let imported = lookup.add_child_components(block_component);
if !imported {
debug!(self.log, "Lookup child component ignored"; "block_root" => %block_root, "type" => component_type);
debug!(self.log, "Lookup child component ignored"; "block_root" => ?block_root, "type" => component_type);
}
}
return true;
@@ -252,10 +256,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.iter()
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
{
warn!(self.log, "Ignoring child lookup parent lookup not found"; "block_root" => ?awaiting_parent);
return false;
}
}
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
let msg = if block_component.is_some() {
"Searching for components of a block with unknown parent"
} else {
@@ -265,14 +274,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.log,
"{}", msg;
"peer_ids" => ?peers,
"block" => ?block_root,
"block_root" => ?block_root,
"id" => lookup.id,
);
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
// Add block components to the new request
if let Some(block_component) = block_component {
lookup.add_child_components(block_component);
@@ -338,7 +344,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok((response, seen_timestamp)) => {
debug!(self.log,
"Received lookup download success";
"block_root" => %block_root,
"block_root" => ?block_root,
"id" => id,
"peer_id" => %peer_id,
"response_type" => ?response_type,
);
@@ -357,7 +364,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Err(e) => {
debug!(self.log,
"Received lookup download failure";
"block_root" => %block_root,
"block_root" => ?block_root,
"id" => id,
"peer_id" => %peer_id,
"response_type" => ?response_type,
"error" => %e,
@@ -374,16 +382,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
/* Check disconnection for single lookups */
self.single_block_lookups.retain(|_, req| {
let should_drop_lookup =
req.should_drop_lookup_on_disconnected_peer(peer_id );
if should_drop_lookup {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root());
self.single_block_lookups.retain(|_, lookup| {
if lookup.remove_peer(peer_id) {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
false
} else {
true
}
!should_drop_lookup
});
}
@@ -425,6 +430,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Received lookup processing result";
"component" => ?R::response_type(),
"block_root" => ?block_root,
"id" => lookup_id,
"result" => ?result,
);
@@ -496,7 +502,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"block_root" => %block_root,
"block_root" => ?block_root,
"error" => ?e
);
Action::Drop
@@ -505,7 +511,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if e.category() == AvailabilityCheckErrorCategory::Internal =>
{
// There errors indicate internal problems and should not downscore the peer
warn!(self.log, "Internal availability check failure"; "block_root" => %block_root, "error" => ?e);
warn!(self.log, "Internal availability check failure"; "block_root" => ?block_root, "error" => ?e);
// Here we choose *not* to call `on_processing_failure` because this could result in a bad
// lookup state transition. This error invalidates both blob and block requests, and we don't know the
@@ -514,7 +520,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Action::Drop
}
other => {
debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other);
debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other);
let peer_id = request_state.on_processing_failure()?;
cx.report_peer(
peer_id,
@@ -540,7 +546,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Ok(LookupResult::Pending)
}
@@ -562,7 +568,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root());
debug!(self.log, "Continuing child lookup"; "parent_root" => ?block_root, "id" => id, "block_root" => ?lookup.block_root());
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
@@ -578,7 +584,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// dropped.
pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) {
if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) {
debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => %dropped_lookup.block_root());
debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => ?dropped_lookup.block_root());
let child_lookups = self
.single_block_lookups
@@ -605,11 +611,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok(LookupResult::Pending) => {} // no action
Ok(LookupResult::Completed) => {
if let Some(lookup) = self.single_block_lookups.remove(&id) {
debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root());
debug!(self.log, "Dropping completed lookup"; "block" => ?lookup.block_root(), "id" => id);
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(lookup.block_root(), cx);
self.update_metrics();
} else {
debug!(self.log, "Attempting to drop non-existent lookup"; "id" => id);
}
}
Err(error) => {

View File

@@ -55,7 +55,7 @@ pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec<NodeChain> {
// Iterate blocks with no children
for tip in nodes {
let mut block_root = tip.block_root;
if parent_to_child.get(&block_root).is_none() {
if !parent_to_child.contains_key(&block_root) {
let mut chain = vec![];
// Resolve chain of blocks

View File

@@ -1,5 +1,5 @@
use super::common::{AwaitingParent, BlockIsProcessed};
use super::{BlockComponent, PeerId};
use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
@@ -150,7 +150,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Wrapper around `RequestState::continue_request` to inject lookup data
/// Potentially makes progress on this request if it's in a progress-able state
pub fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
@@ -163,26 +163,51 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
let block_is_processed = self.block_request_state.state.is_processed();
R::request_state_mut(self).continue_request(
id,
AwaitingParent(awaiting_parent),
downloaded_block_expected_blobs,
BlockIsProcessed(block_is_processed),
cx,
)
}
let request = R::request_state_mut(self);
/// Add all given peers to both block and blob request states.
pub fn add_peer(&mut self, peer_id: PeerId) {
self.block_request_state.state.add_peer(&peer_id);
self.blob_request_state.state.add_peer(&peer_id);
}
// Attempt to progress awaiting downloads
if request.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = request.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}
/// Add all given peers to both block and blob request states.
pub fn add_peers(&mut self, peers: &[PeerId]) {
for peer in peers {
self.add_peer(*peer);
let peer_id = request
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;
// make_request returns true only if a request needs to be made
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
request.get_state_mut().on_download_start()?;
} else {
request.get_state_mut().on_completed_request()?;
}
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent
&& (block_is_processed || matches!(R::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {
return R::send_for_processing(id, result, cx);
}
}
Ok(())
}
/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
let inserted_block = self.block_request_state.state.add_peer(&peer_id);
let inserted_blob = self.blob_request_state.state.add_peer(&peer_id);
inserted_block || inserted_blob
}
/// Returns true if the block has already been downloaded.
@@ -191,21 +216,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&& self.blob_request_state.state.is_processed()
}
/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);
if self.all_available_peers().count() == 0 {
return true;
}
// Note: if the peer disconnected happens to have an on-going request associated with this
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
// now.
false
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id)
&& self.blob_request_state.state.remove_peer(peer_id)
}
}
@@ -464,14 +479,17 @@ impl<T: Clone> SingleLookupRequestState<T> {
self.failed_processing >= self.failed_downloading
}
/// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`.
pub fn add_peer(&mut self, peer_id: &PeerId) {
self.available_peers.insert(*peer_id);
/// Add peer to this request states. The peer must be able to serve this request.
/// Returns true if the peer is newly inserted.
pub fn add_peer(&mut self, peer_id: &PeerId) -> bool {
self.available_peers.insert(*peer_id)
}
/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
/// Remove peer from available peers. Return true if there are no more available peers and the
/// request is not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
self.available_peers.remove(disconnected_peer_id);
self.available_peers.is_empty() && self.is_awaiting_download()
}
pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {

View File

@@ -450,8 +450,25 @@ impl TestRig {
})
}
fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
// Return RPCErrors for all active requests of peer
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Sync(id),
..
} if *peer_id == disconnected_peer_id => Some(*id),
_ => None,
}) {
self.send_sync_message(SyncMessage::RpcError {
peer_id: disconnected_peer_id,
request_id,
error: RPCError::Disconnected,
});
}
}
fn drain_network_rx(&mut self) {

View File

@@ -56,7 +56,7 @@ use lighthouse_network::rpc::RPCError;
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slog::{crit, debug, error, info, o, trace, warn, Logger};
use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
@@ -257,9 +257,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
beacon_chain.clone(),
log.clone(),
),
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
block_lookups: BlockLookups::new(log.clone()),
range_sync: RangeSync::new(
beacon_chain.clone(),
log.new(o!("service" => "range_sync")),
),
backfill_sync: BackFillSync::new(
beacon_chain.clone(),
network_globals,
log.new(o!("service" => "backfill_sync")),
),
block_lookups: BlockLookups::new(log.new(o!("service"=> "lookup_sync"))),
log: log.clone(),
}
}
@@ -366,9 +373,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
let _ = self.backfill_sync.peer_disconnected(peer_id);
self.update_sync_state();
}

View File

@@ -174,30 +174,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);
if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)

View File

@@ -278,9 +278,8 @@ where
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in self
.chains
.call_all(|chain| chain.remove_peer(peer_id, network))
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.remove_peer(peer_id))
{
self.on_chain_removed(
removed_chain,