Light client updates by range RPC (#6383)

* enable lc update over rpc

* resolve TODOs

* resolve merge conflicts

* move max light client updates to eth spec

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into light-client-updates-by-range-rpc

* remove ethspec dependency

* Update beacon_node/network/src/network_beacon_processor/rpc_methods.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/lighthouse_network/src/rpc/methods.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
Eitan Seri-Levi
2024-10-17 19:50:51 -07:00
committed by GitHub
parent 9f1bec6372
commit d1fda938a3
15 changed files with 383 additions and 11 deletions

View File

@@ -14,6 +14,7 @@ use beacon_processor::{
use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
LightClientUpdatesByRangeRequest,
};
use lighthouse_network::rpc::{RequestId, SubstreamId};
use lighthouse_network::{
@@ -831,6 +832,32 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new work event to process a `LightClientUpdatesByRange` request from the RPC network.
pub fn send_light_client_updates_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: LightClientUpdatesByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.handle_light_client_updates_by_range(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::LightClientUpdatesByRangeRequest(Box::new(process_fn)),
})
}
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an internal error.

View File

@@ -10,6 +10,7 @@ use lighthouse_network::rpc::methods::{
};
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use methods::LightClientUpdatesByRangeRequest;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
@@ -428,6 +429,105 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}
pub fn handle_light_client_updates_by_range(
self: &Arc<Self>,
peer_id: PeerId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: LightClientUpdatesByRangeRequest,
) {
self.terminate_response_stream(
peer_id,
connection_id,
substream_id,
request_id,
self.clone()
.handle_light_client_updates_by_range_request_inner(
peer_id,
connection_id,
substream_id,
request_id,
request,
),
Response::LightClientUpdatesByRange,
);
}
/// Handle a `LightClientUpdatesByRange` request from the peer.
pub fn handle_light_client_updates_by_range_request_inner(
self: Arc<Self>,
peer_id: PeerId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
req: LightClientUpdatesByRangeRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
debug!(self.log, "Received LightClientUpdatesByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_period" => req.start_period,
);
// Should not send more than max light client updates
let max_request_size: u64 = req.max_requested();
if req.count > max_request_size {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded max size",
));
}
let lc_updates = match self
.chain
.get_light_client_updates(req.start_period, req.count)
{
Ok(lc_updates) => lc_updates,
Err(e) => {
error!(self.log, "Unable to obtain light client updates";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
for lc_update in lc_updates.iter() {
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::LightClientUpdatesByRange(Some(Arc::new(lc_update.clone()))),
request_id,
id: (connection_id, substream_id),
});
}
let lc_updates_sent = lc_updates.len();
if lc_updates_sent < req.count as usize {
debug!(
self.log,
"LightClientUpdatesByRange outgoing response processed";
"peer" => %peer_id,
"info" => "Failed to return all requested light client updates. The peer may have requested data ahead of whats currently available",
"start_period" => req.start_period,
"requested" => req.count,
"returned" => lc_updates_sent
);
} else {
debug!(
self.log,
"LightClientUpdatesByRange outgoing response processed";
"peer" => %peer_id,
"start_period" => req.start_period,
"requested" => req.count,
"returned" => lc_updates_sent
);
}
Ok(())
}
/// Handle a `LightClientBootstrap` request from the peer.
pub fn handle_light_client_bootstrap(
self: &Arc<Self>,

View File

@@ -311,6 +311,17 @@ impl<T: BeaconChainTypes> Router<T> {
rpc_request.id,
),
),
RequestType::LightClientUpdatesByRange(request) => self
.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_light_client_updates_by_range_request(
peer_id,
request_id.0,
request_id.1,
rpc_request.id,
request,
),
),
_ => {}
}
}
@@ -351,7 +362,8 @@ impl<T: BeaconChainTypes> Router<T> {
// Light client responses should not be received
Response::LightClientBootstrap(_)
| Response::LightClientOptimisticUpdate(_)
| Response::LightClientFinalityUpdate(_) => unreachable!(),
| Response::LightClientFinalityUpdate(_)
| Response::LightClientUpdatesByRange(_) => unreachable!(),
}
}