Reconstruct Payloads using Payload Bodies Methods (#4028)

## Issue Addressed

* #3895 

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
ethDreamer
2023-03-19 23:15:59 +00:00
parent 020fb483fe
commit 65a5eb8292
18 changed files with 1335 additions and 33 deletions

View File

@@ -9,8 +9,8 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio_stream::StreamExt;
use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot};
use super::Worker;
@@ -131,21 +131,25 @@ impl<T: BeaconChainTypes> Worker<T> {
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
let requested_blocks = request.block_roots.len();
let mut block_stream = match self
.chain
.get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor)
{
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut send_block_count = 0;
let mut send_response = true;
for root in request.block_roots.iter() {
match self
.chain
.get_block_checking_early_attester_cache(root)
.await
{
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
self.send_response(
peer_id,
Response::BlocksByRoot(Some(block)),
Response::BlocksByRoot(Some(block.clone())),
request_id,
);
send_block_count += 1;
@@ -190,7 +194,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.log,
"Received BlocksByRoot Request";
"peer" => %peer_id,
"requested" => request.block_roots.len(),
"requested" => requested_blocks,
"returned" => %send_block_count
);
@@ -344,14 +348,19 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
let mut block_stream = match self.chain.get_blocks(block_roots, &executor) {
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.get_block(&root).await {
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
@@ -361,7 +370,7 @@ impl<T: BeaconChainTypes> Worker<T> {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(Some(Arc::new(block))),
response: Response::BlocksByRange(Some(block.clone())),
id: request_id,
});
}