Handle sync lookup request streams in network context (#5583)

* by-root-stream-terminator

* Fix tests

* Resolve merge conflicts

* Log report reason

* Some lints and bugfixes (#23)

* fix lints

* bug fixes

* Fix tests

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into handle-sync-lookup-requests

* Pr 5583 review (#24)

* add bad state warn log

* add rust docs to new fields in `SyncNetworkContext`

* remove timestamp todo

* add back lookup verify error

* remove TODOs
This commit is contained in:
Lion - dapplion
2024-04-23 01:06:39 +09:00
committed by GitHub
parent 67f8405921
commit f7aca97a55
8 changed files with 644 additions and 540 deletions

View File

@@ -1,6 +1,8 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest};
pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest};
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
@@ -9,17 +11,23 @@ use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::manager::SingleLookupReqId;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, trace, warn};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
mod requests;
pub struct BlocksAndBlobsByRangeResponse<E: EthSpec> {
pub sender_id: RangeRequestId,
pub responses: Result<Vec<RpcBlock<E>>, String>,
@@ -37,6 +45,41 @@ pub enum RangeRequestId {
},
}
#[derive(Debug)]
pub enum RpcEvent<T> {
StreamTermination,
Response(T, Duration),
RPCError(RPCError),
}
pub type RpcProcessingResult<T> = Option<Result<(T, Duration), LookupFailure>>;
pub enum LookupFailure {
RpcError(RPCError),
LookupVerifyError(LookupVerifyError),
}
impl std::fmt::Display for LookupFailure {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e),
LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e),
}
}
}
impl From<RPCError> for LookupFailure {
fn from(e: RPCError) -> Self {
LookupFailure::RpcError(e)
}
}
impl From<LookupVerifyError> for LookupFailure {
fn from(e: LookupVerifyError) -> Self {
LookupFailure::LookupVerifyError(e)
}
}
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// The network channel to relay messages to the Network service.
@@ -45,6 +88,12 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A sequential ID for all RPC requests.
request_id: Id,
/// A mapping of active BlocksByRoot requests, including both current slot and parent lookups.
blocks_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlocksByRootRequest>,
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,
/// BlocksByRange requests paired with BlobsByRange
range_blocks_and_blobs_requests:
FnvHashMap<Id, (RangeRequestId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
@@ -91,6 +140,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
request_id: 1,
blocks_by_root_requests: <_>::default(),
blobs_by_root_requests: <_>::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
network_beacon_processor,
chain,
@@ -245,62 +296,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
pub fn block_lookup_request(
&self,
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
request: BlocksByRootRequest,
request: BlocksByRootSingleRequest,
) -> Result<(), &'static str> {
debug!(
self.log,
"Sending BlocksByRoot Request";
"method" => "BlocksByRoot",
"block_roots" => ?request.block_roots().to_vec(),
"block_root" => ?request.0,
"peer" => %peer_id,
"id" => ?id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRoot(request),
request: Request::BlocksByRoot(request.into_request(&self.chain.spec)),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
})?;
self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request));
Ok(())
}
pub fn blob_lookup_request(
&self,
&mut self,
id: SingleLookupReqId,
blob_peer_id: PeerId,
blob_request: BlobsByRootRequest,
peer_id: PeerId,
request: BlobsByRootSingleBlockRequest,
) -> Result<(), &'static str> {
if let Some(block_root) = blob_request
.blob_ids
.as_slice()
.first()
.map(|id| id.block_root)
{
let indices = blob_request
.blob_ids
.as_slice()
.iter()
.map(|id| id.index)
.collect::<Vec<_>>();
debug!(
self.log,
"Sending BlobsByRoot Request";
"method" => "BlobsByRoot",
"block_root" => ?block_root,
"blob_indices" => ?indices,
"peer" => %blob_peer_id,
"id" => ?id
);
debug!(
self.log,
"Sending BlobsByRoot Request";
"method" => "BlobsByRoot",
"block_root" => ?request.block_root,
"blob_indices" => ?request.indices,
"peer" => %peer_id,
"id" => ?id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
})?;
self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request));
self.send_network_msg(NetworkMessage::SendRequest {
peer_id: blob_peer_id,
request: Request::BlobsByRoot(blob_request),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
})?;
}
Ok(())
}
@@ -329,7 +375,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Reports to the scoring algorithm the behaviour of a peer.
pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) {
debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action);
debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action, "msg" => %msg);
self.network_send
.send(NetworkMessage::ReportPeer {
peer_id,
@@ -405,4 +451,86 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.range_blocks_and_blobs_requests
.insert(id, (sender_id, info));
}
// Request handlers
pub fn on_single_block_response(
&mut self,
request_id: SingleLookupReqId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> RpcProcessingResult<Arc<SignedBeaconBlock<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
return None;
};
Some(match block {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
Err(e) => {
// The request must be dropped after receiving an error.
request.remove();
Err(e.into())
}
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
Err(e) => Err(e.into()),
},
RpcEvent::RPCError(e) => {
request.remove();
Err(e.into())
}
})
}
pub fn on_single_blob_response(
&mut self,
request_id: SingleLookupReqId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> RpcProcessingResult<FixedBlobSidecarList<T::EthSpec>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
return None;
};
Some(match blob {
RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, timestamp_now()))
.map_err(Into::into),
Ok(None) => return None,
Err(e) => {
request.remove();
Err(e.into())
}
},
RpcEvent::StreamTermination => {
// Stream terminator
match request.remove().terminate() {
Some(blobs) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, timestamp_now()))
.map_err(Into::into),
None => return None,
}
}
RpcEvent::RPCError(e) => {
request.remove();
Err(e.into())
}
})
}
}
fn to_fixed_blob_sidecar_list<E: EthSpec>(
blobs: Vec<Arc<BlobSidecar<E>>>,
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
let mut fixed_list = FixedBlobSidecarList::default();
for blob in blobs.into_iter() {
let index = blob.index as usize;
*fixed_list
.get_mut(index)
.ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob)
}
Ok(fixed_list)
}