Merge commit '65a5eb829264cb279ed66814c961991ae3a0a04b' into eip4844

This commit is contained in:
Diva M
2023-03-24 13:24:21 -05:00
18 changed files with 1368 additions and 36 deletions

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::beacon_processor::{worker::FUTURE_SLOT_TOLERANCE, SendOnDrop};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
@@ -12,10 +14,9 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::light_client_bootstrap::LightClientBootstrap;
use types::{Epoch, EthSpec, Hash256, Slot};
use tokio_stream::StreamExt;
use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot};
use super::Worker;
@@ -138,21 +139,25 @@ impl<T: BeaconChainTypes> Worker<T> {
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
let requested_blocks = request.block_roots.len();
let mut block_stream = match self
.chain
.get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor)
{
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut send_block_count = 0;
let mut send_response = true;
for root in request.block_roots.iter() {
match self
.chain
.get_block_checking_early_attester_cache(root)
.await
{
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
self.send_response(
peer_id,
Response::BlocksByRoot(Some(block)),
Response::BlocksByRoot(Some(block.clone())),
request_id,
);
send_block_count += 1;
@@ -197,8 +202,8 @@ impl<T: BeaconChainTypes> Worker<T> {
self.log,
"Received BlocksByRoot Request";
"peer" => %peer_id,
"requested" => request.block_roots.len(),
"returned" => send_block_count
"requested" => requested_blocks,
"returned" => %send_block_count
);
// send stream termination
@@ -519,14 +524,19 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
let mut block_stream = match self.chain.get_blocks(block_roots, &executor) {
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.get_block(&root).await {
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
@@ -536,7 +546,7 @@ impl<T: BeaconChainTypes> Worker<T> {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(Some(Arc::new(block))),
response: Response::BlocksByRange(Some(block.clone())),
id: request_id,
});
}