Generalize sync ActiveRequests (#6398)

* Generalize sync ActiveRequests

* Remove impossible to hit test

* Update beacon_node/lighthouse_network/src/service/api_types.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Update beacon_node/network/src/sync/network_context.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Update beacon_node/network/src/sync/network_context.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Simplify match

* Fix display

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Sampling requests should not expect all responses

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Fix sampling_batch_requests_not_enough_responses_returned test

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into sync-active-request-generalize
This commit is contained in:
Lion - dapplion
2024-10-17 21:14:13 +03:00
committed by GitHub
parent 606a113cff
commit a074e9eb33
10 changed files with 371 additions and 335 deletions

View File

@@ -2,7 +2,6 @@
//! channel and stores a global RPC ID to perform requests.
use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError};
use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest};
pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest};
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
use super::manager::BlockProcessType;
@@ -30,8 +29,11 @@ use lighthouse_network::service::api_types::{
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use rand::seq::SliceRandom;
use rand::thread_rng;
use requests::ActiveDataColumnsByRootRequest;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems,
DataColumnsByRootRequestItems,
};
use slog::{debug, error, warn};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
@@ -180,18 +182,17 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
request_id: Id,
/// A mapping of active BlocksByRoot requests, including both current slot and parent lookups.
blocks_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlocksByRootRequest>,
blocks_by_root_requests:
ActiveRequests<SingleLookupReqId, BlocksByRootRequestItems<T::EthSpec>>,
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,
blobs_by_root_requests: ActiveRequests<SingleLookupReqId, BlobsByRootRequestItems<T::EthSpec>>,
/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
ActiveRequests<DataColumnsByRootRequestId, DataColumnsByRootRequestItems<T::EthSpec>>,
/// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
FnvHashMap<DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest<T::EthSpec>>,
/// BlocksByRange requests paired with BlobsByRange
range_block_components_requests:
FnvHashMap<Id, (RangeRequestId, RangeBlockComponentsRequest<T::EthSpec>)>,
@@ -239,9 +240,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
request_id: 1,
blocks_by_root_requests: <_>::default(),
blobs_by_root_requests: <_>::default(),
data_columns_by_root_requests: <_>::default(),
blocks_by_root_requests: ActiveRequests::new("blocks_by_root"),
blobs_by_root_requests: ActiveRequests::new("blobs_by_root"),
data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"),
custody_by_root_requests: <_>::default(),
range_block_components_requests: FnvHashMap::default(),
network_beacon_processor,
@@ -270,34 +271,19 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let failed_block_ids = self
.blocks_by_root_requests
.iter()
.filter_map(|(id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::SingleBlock { id: *id })
} else {
None
}
});
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlock { id: *id });
let failed_blob_ids = self
.blobs_by_root_requests
.iter()
.filter_map(|(id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::SingleBlob { id: *id })
} else {
None
}
});
let failed_data_column_by_root_ids =
self.data_columns_by_root_requests
.iter()
.filter_map(|(req_id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester))
} else {
None
}
});
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlob { id: *id });
let failed_data_column_by_root_ids = self
.data_columns_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id));
failed_range_ids
.chain(failed_block_ids)
@@ -616,8 +602,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request, peer_id));
self.blocks_by_root_requests.insert(
id,
peer_id,
// true = enforce max_requests as returned for blocks_by_root. We always request a single
// block and the peer must have it.
true,
BlocksByRootRequestItems::new(request),
);
Ok(LookupRequestResult::RequestSent(req_id))
}
@@ -677,8 +669,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request, peer_id));
self.blobs_by_root_requests.insert(
id,
peer_id,
// true = enforce max_requests are returned for blobs_by_root. We only issue requests for
// blocks after we know the block has data, and only request peers after they claim to
// have imported the block+blobs.
true,
BlobsByRootRequestItems::new(request),
);
Ok(LookupRequestResult::RequestSent(req_id))
}
@@ -689,8 +688,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
requester: DataColumnsByRootRequester,
peer_id: PeerId,
request: DataColumnsByRootSingleBlockRequest,
expect_max_responses: bool,
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
let req_id = DataColumnsByRootRequestId(self.next_id());
let req_id = DataColumnsByRootRequestId {
id: self.next_id(),
requester,
};
debug!(
self.log,
"Sending DataColumnsByRoot Request";
@@ -705,12 +708,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)),
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)),
})?;
self.data_columns_by_root_requests.insert(
req_id,
ActiveDataColumnsByRootRequest::new(request, peer_id, requester),
peer_id,
expect_max_responses,
DataColumnsByRootRequestItems::new(request),
);
Ok(LookupRequestResult::RequestSent(req_id))
@@ -916,142 +921,74 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Request handlers
pub fn on_single_block_response(
pub(crate) fn on_single_block_response(
&mut self,
request_id: SingleLookupReqId,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]);
return None;
};
let resp = match rpc_event {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
Err(e) => {
// The request must be dropped after receiving an error.
request.remove();
Err(e.into())
}
let response = self.blocks_by_root_requests.on_response(id, rpc_event);
let response = response.map(|res| {
res.and_then(|(mut blocks, seen_timestamp)| {
// Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the
// response count to at most 1.
match blocks.pop() {
Some(block) => Ok((block, seen_timestamp)),
// Should never happen, `blocks_by_root_requests` enforces that we receive at least
// 1 chunk.
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
Err(e) => Err(e.into()),
},
RpcEvent::RPCError(e) => {
request.remove();
Err(e.into())
}
};
if let Err(RpcResponseError::VerifyError(e)) = &resp {
})
});
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
Some(resp)
response
}
pub fn on_single_blob_response(
pub(crate) fn on_single_blob_response(
&mut self,
request_id: SingleLookupReqId,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]);
return None;
};
let resp = match rpc_event {
RpcEvent::Response(blob, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(blob) {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, seen_timestamp))
.map_err(|e| (e.into(), request.resolve())),
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};
match resp {
Ok(resp) => Some(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
// catch if a peer is returning more blobs than requested or if the excess blobs are
// invalid.
Err((e, resolved)) => {
if let RpcResponseError::VerifyError(e) = &e {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
} else {
Some(Err(e))
}
}
let response = self.blobs_by_root_requests.on_response(id, rpc_event);
let response = response.map(|res| {
res.and_then(
|(blobs, seen_timestamp)| match to_fixed_blob_sidecar_list(blobs) {
Ok(blobs) => Ok((blobs, seen_timestamp)),
Err(e) => Err(e.into()),
},
)
});
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
response
}
#[allow(clippy::type_complexity)]
pub fn on_data_columns_by_root_response(
pub(crate) fn on_data_columns_by_root_response(
&mut self,
id: DataColumnsByRootRequestId,
_peer_id: PeerId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>> {
let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else {
return None;
};
let resp = self
.data_columns_by_root_requests
.on_response(id, rpc_event);
self.report_rpc_response_errors(resp, peer_id)
}
let resp = match rpc_event {
RpcEvent::Response(data_column, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(data_column) {
Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)),
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};
match resp {
Ok(resp) => Some(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
// catch if a peer is returning more columns than requested or if the excess blobs are
// invalid.
Err((e, resolved)) => {
if let RpcResponseError::VerifyError(_e) = &e {
// TODO(das): this is a bug, we should not penalise peer in this case.
// confirm this can be removed.
// self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
} else {
Some(Err(e))
}
}
fn report_rpc_response_errors<R>(
&mut self,
resp: Option<RpcResponseResult<R>>,
peer_id: PeerId,
) -> Option<RpcResponseResult<R>> {
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
resp
}
/// Insert a downloaded column into an active custody request. Then make progress on the