mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 03:31:45 +00:00
Implement data columns by network boilerplate (#6224)
* Implement data columns by network boilerplate * Use correct quota values * Address PR review * Update currently_supported * Merge remote-tracking branch 'sigp/unstable' into peerdas-network-boilerplate * PR reviews * Fix data column rpc request not being sent due to incorrect limits set. (#6000)
This commit is contained in:
@@ -8,7 +8,9 @@ use beacon_processor::{
|
||||
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
|
||||
WorkEvent as BeaconWorkEvent,
|
||||
};
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||
};
|
||||
use lighthouse_network::{
|
||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
|
||||
@@ -602,6 +604,40 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new work event to process `DataColumnsByRootRequest`s from the RPC network.
|
||||
pub fn send_data_columns_by_roots_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: DataColumnsByRootRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn =
|
||||
move || processor.handle_data_columns_by_root_request(peer_id, request_id, request);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::DataColumnsByRootsRequest(Box::new(process_fn)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new work event to process `DataColumnsByRange`s from the RPC network.
|
||||
pub fn send_data_columns_by_range_request(
|
||||
self: &Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: DataColumnsByRangeRequest,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn =
|
||||
move || processor.handle_data_columns_by_range_request(peer_id, request_id, request);
|
||||
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::DataColumnsByRangeRequest(Box::new(process_fn)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new work event to process `LightClientBootstrap`s from the RPC network.
|
||||
pub fn send_light_client_bootstrap_request(
|
||||
self: &Arc<Self>,
|
||||
|
||||
@@ -4,7 +4,9 @@ use crate::status::ToStatusMessage;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
|
||||
use itertools::process_results;
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||
};
|
||||
use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
|
||||
use slog::{debug, error, warn};
|
||||
@@ -314,6 +316,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a `DataColumnsByRoot` request from the peer.
|
||||
pub fn handle_data_columns_by_root_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
_request_id: PeerRequestId,
|
||||
request: DataColumnsByRootRequest,
|
||||
) {
|
||||
// TODO(das): implement handler
|
||||
debug!(self.log, "Received DataColumnsByRoot Request";
|
||||
"peer_id" => %peer_id,
|
||||
"count" => request.data_column_ids.len()
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle a `LightClientBootstrap` request from the peer.
|
||||
pub fn handle_light_client_bootstrap(
|
||||
self: &Arc<Self>,
|
||||
@@ -815,6 +831,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a `DataColumnsByRange` request from the peer.
|
||||
pub fn handle_data_columns_by_range_request(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
_request_id: PeerRequestId,
|
||||
req: DataColumnsByRangeRequest,
|
||||
) {
|
||||
// TODO(das): implement handler
|
||||
debug!(self.log, "Received DataColumnsByRange Request";
|
||||
"peer_id" => %peer_id,
|
||||
"count" => req.count,
|
||||
"start_slot" => req.start_slot,
|
||||
);
|
||||
}
|
||||
|
||||
/// Helper function to ensure single item protocol always end with either a single chunk or an
|
||||
/// error
|
||||
fn terminate_response_single_item<R, F: Fn(R) -> Response<T::EthSpec>>(
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
/// Handles messages from the network and routes them to the appropriate service to be handled.
|
||||
pub struct Router<T: BeaconChainTypes> {
|
||||
@@ -216,6 +216,14 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
self.network_beacon_processor
|
||||
.send_blobs_by_roots_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::DataColumnsByRoot(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_data_columns_by_roots_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::DataColumnsByRange(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_data_columns_by_range_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result(
|
||||
self.network_beacon_processor
|
||||
.send_light_client_bootstrap_request(peer_id, request_id, request),
|
||||
@@ -258,6 +266,12 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
Response::BlobsByRoot(blob) => {
|
||||
self.on_blobs_by_root_response(peer_id, request_id, blob);
|
||||
}
|
||||
Response::DataColumnsByRoot(data_column) => {
|
||||
self.on_data_columns_by_root_response(peer_id, request_id, data_column);
|
||||
}
|
||||
Response::DataColumnsByRange(data_column) => {
|
||||
self.on_data_columns_by_range_response(peer_id, request_id, data_column);
|
||||
}
|
||||
// Light client responses should not be received
|
||||
Response::LightClientBootstrap(_)
|
||||
| Response::LightClientOptimisticUpdate(_)
|
||||
@@ -507,11 +521,11 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
SyncRequestId::SingleBlock { .. } | SyncRequestId::SingleBlob { .. } => {
|
||||
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
|
||||
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
|
||||
other => {
|
||||
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
|
||||
return;
|
||||
}
|
||||
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
|
||||
},
|
||||
AppRequestId::Router => {
|
||||
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
|
||||
@@ -570,12 +584,8 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
let request_id = match request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SingleBlock { .. } => id,
|
||||
SyncRequestId::RangeBlockAndBlobs { .. } => {
|
||||
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
|
||||
return;
|
||||
}
|
||||
SyncRequestId::SingleBlob { .. } => {
|
||||
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
|
||||
other => {
|
||||
crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other);
|
||||
return;
|
||||
}
|
||||
},
|
||||
@@ -608,12 +618,8 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
let request_id = match request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SingleBlob { .. } => id,
|
||||
SyncRequestId::SingleBlock { .. } => {
|
||||
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
|
||||
return;
|
||||
}
|
||||
SyncRequestId::RangeBlockAndBlobs { .. } => {
|
||||
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
|
||||
other => {
|
||||
crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other);
|
||||
return;
|
||||
}
|
||||
},
|
||||
@@ -636,6 +642,67 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `DataColumnsByRoot` response from the peer.
|
||||
pub fn on_data_columns_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::DataColumnsByRoot { .. } => id,
|
||||
other => {
|
||||
crit!(self.log, "DataColumnsByRoot response on incorrect request"; "request" => ?other);
|
||||
return;
|
||||
}
|
||||
},
|
||||
AppRequestId::Router => {
|
||||
crit!(self.log, "All DataColumnsByRoot requests belong to sync"; "peer_id" => %peer_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received DataColumnsByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
peer_id,
|
||||
data_column,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn on_data_columns_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received DataColumnsByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
if let AppRequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcDataColumn {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
data_column,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
} else {
|
||||
crit!(
|
||||
self.log,
|
||||
"All data columns by range responses should belong to sync"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_beacon_processor_send_result(
|
||||
&mut self,
|
||||
result: Result<(), crate::network_beacon_processor::Error<T::EthSpec>>,
|
||||
|
||||
@@ -101,6 +101,14 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
|
||||
/// A data columns has been received from the RPC
|
||||
RpcDataColumn {
|
||||
request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
data_column: Option<Arc<DataColumnSidecar<E>>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
|
||||
/// A block with an unknown parent has been received.
|
||||
UnknownParentBlock(PeerId, RpcBlock<E>, Hash256),
|
||||
|
||||
@@ -337,6 +345,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
SyncRequestId::SingleBlob { id } => {
|
||||
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::DataColumnsByRoot { .. } => {
|
||||
// TODO(das)
|
||||
}
|
||||
SyncRequestId::RangeBlockAndBlobs { id } => {
|
||||
if let Some(sender_id) = self.network.range_request_failed(id) {
|
||||
match sender_id {
|
||||
@@ -614,6 +625,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
blob_sidecar,
|
||||
seen_timestamp,
|
||||
} => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp),
|
||||
SyncMessage::RpcDataColumn {
|
||||
request_id,
|
||||
peer_id,
|
||||
data_column,
|
||||
seen_timestamp,
|
||||
} => self.rpc_data_column_received(request_id, peer_id, data_column, seen_timestamp),
|
||||
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
||||
let block_slot = block.slot();
|
||||
let parent_root = block.parent_root();
|
||||
@@ -846,6 +863,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
SyncRequestId::SingleBlob { .. } => {
|
||||
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
|
||||
}
|
||||
SyncRequestId::DataColumnsByRoot { .. } => {
|
||||
// TODO(das)
|
||||
}
|
||||
SyncRequestId::RangeBlockAndBlobs { id } => {
|
||||
self.range_block_and_blobs_response(id, peer_id, block.into())
|
||||
}
|
||||
@@ -888,12 +908,25 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
None => RpcEvent::StreamTermination,
|
||||
},
|
||||
),
|
||||
SyncRequestId::DataColumnsByRoot { .. } => {
|
||||
// TODO(das)
|
||||
}
|
||||
SyncRequestId::RangeBlockAndBlobs { id } => {
|
||||
self.range_block_and_blobs_response(id, peer_id, blob.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn rpc_data_column_received(
|
||||
&mut self,
|
||||
_request_id: SyncRequestId,
|
||||
_peer_id: PeerId,
|
||||
_data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
|
||||
_seen_timestamp: Duration,
|
||||
) {
|
||||
// TODO(das): implement handler
|
||||
}
|
||||
|
||||
fn on_single_blob_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
|
||||
Reference in New Issue
Block a user