improve peer scoring

This commit is contained in:
realbigsean
2023-05-02 18:28:45 -04:00
parent e3f4218624
commit af15789b6f
3 changed files with 192 additions and 43 deletions

View File

@@ -1,5 +1,5 @@
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{PeerAction, PeerId};
@@ -17,7 +17,7 @@ use types::{BlobSidecar, SignedBeaconBlock, Slot};
use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE};
use self::parent_lookup::{ParentLookup, ParentVerifyError};
use self::single_block_lookup::SingleBlockLookup;
use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup};
use super::manager::BlockProcessingResult;
use super::BatchProcessResult;
use super::{
@@ -105,6 +105,18 @@ impl PeerSource {
PeerSource::Gossip(id) => id,
}
}
fn should_have_block(&self) -> bool {
match self {
PeerSource::Attestation(_) => true,
PeerSource::Gossip(_) => false,
}
}
fn should_have_blobs(&self) -> bool {
match self {
PeerSource::Attestation(_) => true,
PeerSource::Gossip(_) => false,
}
}
}
#[derive(Debug, Copy, Clone)]
@@ -317,10 +329,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let LookupDownloadStatus::AvailabilityCheck(e) =
request_ref.add_block(root, block)
{
handle_block_lookup_verify_error(
handle_availability_check_error(
request_id_ref,
request_ref,
ResponseType::Block,
ResponseType::Blob,
peer_id,
e,
cx,
@@ -389,7 +401,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let LookupDownloadStatus::AvailabilityCheck(e) =
request_ref.add_blobs(block_root, blobs)
{
handle_block_lookup_verify_error(
handle_availability_check_error(
request_id_ref,
request_ref,
ResponseType::Blob,
@@ -540,7 +552,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
}
}
LookupDownloadStatus::AvailabilityCheck(e) => {}
LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block(
BlockError::AvailabilityCheck(e),
peer_id,
cx,
ResponseType::Block,
parent_lookup,
),
}
}
Ok(None) => {
@@ -551,6 +569,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Err(e) => match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
@@ -581,6 +600,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"bbroot_failed_chains",
);
}
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to block request, requesting a new peer";
);
parent_lookup
.current_parent_request
.block_request_state
.potential_peers
.remove(&peer_id);
self.request_parent_block(parent_lookup, cx);
}
},
};
@@ -640,7 +671,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
}
}
LookupDownloadStatus::AvailabilityCheck(e) => {}
LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block(
BlockError::AvailabilityCheck(e),
peer_id,
cx,
ResponseType::Blob,
parent_lookup,
),
}
}
Ok(None) => {
@@ -651,6 +688,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Err(e) => match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
@@ -681,6 +719,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"bbroot_failed_chains",
);
}
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to blob request, requesting a new peer";
);
parent_lookup
.current_parent_request
.blob_request_state
.potential_peers
.remove(&peer_id);
self.request_parent_blob(parent_lookup, cx);
}
},
};
@@ -1040,34 +1090,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
BlockProcessingResult::Err(outcome) => {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
self.log, "Invalid parent chain";
"score_adjustment" => %PeerAction::MidToleranceError,
"outcome" => ?outcome,
"last_peer" => %peer_id,
);
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
cx.report_peer(
self.handle_invalid_block(
outcome,
peer_id.to_peer_id(),
PeerAction::MidToleranceError,
"parent_request_err",
cx,
response_type,
parent_lookup,
);
// Try again if possible
match response_type {
ResponseType::Block => {
parent_lookup.block_processing_failed();
self.request_parent_block(parent_lookup, cx);
}
ResponseType::Blob => {
parent_lookup.blob_processing_failed();
self.request_parent_blob(parent_lookup, cx);
}
}
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
@@ -1086,6 +1115,38 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
fn handle_invalid_block(
&mut self,
outcome: BlockError<<T as BeaconChainTypes>::EthSpec>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
response_type: ResponseType,
mut parent_lookup: ParentLookup<T>,
) {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
self.log, "Invalid parent chain";
"score_adjustment" => %PeerAction::MidToleranceError,
"outcome" => ?outcome,
"last_peer" => %peer_id,
);
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
// Try again if possible
match response_type {
ResponseType::Block => {
parent_lookup.block_processing_failed();
self.request_parent_block(parent_lookup, cx);
}
ResponseType::Blob => {
parent_lookup.blob_processing_failed();
self.request_parent_blob(parent_lookup, cx);
}
}
}
pub fn parent_chain_processed(
&mut self,
chain_hash: Hash256,
@@ -1324,17 +1385,62 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
fn handle_block_lookup_verify_error<T: BeaconChainTypes, Err: Into<&'static str>>(
fn handle_availability_check_error<T: BeaconChainTypes>(
request_id_ref: &mut u32,
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
response_type: ResponseType,
peer_id: PeerId,
e: Err,
e: AvailabilityCheckError,
cx: &mut SyncNetworkContext<T>,
log: &Logger,
) -> ShouldRemoveLookup {
let msg = e.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
warn!(log, "Peer sent message in single block lookup causing failed availability check";
"root" => ?request_ref.requested_block_root,
"error" => ?e,
"peer_id" => %peer_id,
"response_type" => ?response_type
);
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
"single_block_failure",
);
retry_request_after_failure(
request_id_ref,
request_ref,
ResponseType::Blob,
&peer_id,
cx,
log,
)
}
fn handle_block_lookup_verify_error<T: BeaconChainTypes>(
request_id_ref: &mut u32,
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
response_type: ResponseType,
peer_id: PeerId,
e: LookupVerifyError,
cx: &mut SyncNetworkContext<T>,
log: &Logger,
) -> ShouldRemoveLookup {
let msg = if matches!(e, LookupVerifyError::BenignFailure) {
match response_type {
ResponseType::Block => request_ref
.block_request_state
.potential_peers
.remove(&peer_id),
ResponseType::Blob => request_ref
.blob_request_state
.potential_peers
.remove(&peer_id),
};
"peer could not response to request"
} else {
let msg = e.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
msg
};
debug!(log, "Single block lookup failed";
"peer_id" => %peer_id,

View File

@@ -40,11 +40,13 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
pub enum ParentVerifyError {
RootMismatch,
NoBlockReturned,
NotEnoughBlobsReturned,
ExtraBlocksReturned,
UnrequestedBlobId,
ExtraBlobsReturned,
InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 },
BenignFailure,
}
#[derive(Debug, PartialEq, Eq)]
@@ -369,6 +371,8 @@ impl From<LookupVerifyError> for ParentVerifyError {
E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId,
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned,
E::BenignFailure => ParentVerifyError::BenignFailure,
}
}
}

View File

@@ -1,7 +1,7 @@
use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus;
use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
@@ -59,8 +59,12 @@ pub enum LookupVerifyError {
ExtraBlocksReturned,
UnrequestedBlobId,
ExtraBlobsReturned,
NotEnoughBlobsReturned,
InvalidIndex(u64),
AvailabilityCheck(String),
/// We don't have enough information to know
/// whether the peer is at fault or simply missed
/// what was requested on gossip.
BenignFailure,
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@@ -201,8 +205,12 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
None => {
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
if peer_id.should_have_block() {
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
} else {
Err(LookupVerifyError::BenignFailure)
}
}
},
State::Processing { peer_id: _ } => match block {
@@ -229,7 +237,9 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.blob_request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlobsReturned)
}
State::Downloading { peer_id } => match blob {
State::Downloading {
peer_id: peer_source,
} => match blob {
Some(blob) => {
let received_id = blob.id();
if !self.requested_ids.contains(&received_id) {
@@ -250,7 +260,15 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
None => {
self.blob_request_state.state = State::Processing { peer_id };
let downloaded_all_blobs = self.downloaded_all_blobs();
if peer_source.should_have_blobs() && !downloaded_all_blobs {
return Err(LookupVerifyError::NotEnoughBlobsReturned);
} else if !downloaded_all_blobs {
return Err(LookupVerifyError::BenignFailure);
}
self.blob_request_state.state = State::Processing {
peer_id: peer_source,
};
Ok(Some((
self.requested_block_root,
self.downloaded_blobs.clone(),
@@ -272,6 +290,27 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
fn downloaded_all_blobs(&self) -> bool {
let expected_num_blobs = self
.downloaded_block
.as_ref()
.map(|block| {
block
.message()
.body()
.blob_kzg_commitments()
.map_or(0, |commitments| commitments.len())
})
.unwrap_or(0);
let downloaded_enough_blobs = expected_num_blobs
== self
.downloaded_blobs
.iter()
.map(|blob_opt| blob_opt.is_some())
.count();
downloaded_enough_blobs
}
pub fn request_block(
&mut self,
) -> Result<Option<(PeerId, BlocksByRootRequest)>, LookupRequestError> {