block and blob handling progress

This commit is contained in:
realbigsean
2022-11-19 16:53:34 -05:00
parent 45897ad4e1
commit dc87156641
11 changed files with 237 additions and 43 deletions

View File

@@ -1541,6 +1541,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
peer_client,
block,
None,
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
@@ -1558,11 +1559,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_block_and_blobs_sidecar(
.process_gossip_block(
message_id,
peer_id,
peer_client,
block_and_blobs,
block_and_blobs.beacon_block.clone(),
Some(block_and_blobs.blobs_sidecar.clone()),
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
)
.await
@@ -1720,6 +1724,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
)
}),
Work::BlobsByRootsRequest {
peer_id,
request_id,
request,
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
worker.handle_blocks_by_root_request(
sub_executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
}),
Work::UnknownBlockAttestation {
message_id,
peer_id,

View File

@@ -658,6 +658,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId,
peer_client: Client,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
blobs: Option<Arc<BlobsSidecar<T::EthSpec>>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
seen_duration: Duration,
@@ -668,6 +669,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id,
peer_client,
block,
blobs,
reprocess_tx.clone(),
seen_duration,
)
@@ -705,6 +707,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId,
peer_client: Client,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
blobs: Option<Arc<BlobsSidecar<T::EthSpec>>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
seen_duration: Duration,
) -> Option<GossipVerifiedBlock<T>> {
@@ -719,7 +722,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let verification_result = self
.chain
.clone()
.verify_block_for_gossip(block.clone())
.verify_block_for_gossip(block.clone(), blobs)
.await;
let block_root = if let Ok(verified_block) = &verification_result {

View File

@@ -4,7 +4,9 @@ use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
use itertools::process_results;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, MAX_REQUEST_BLOBS_SIDECARS,
};
use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
@@ -12,7 +14,7 @@ use slog::{debug, error};
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{Epoch, EthSpec, Hash256, Slot};
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot};
use super::Worker;
@@ -204,6 +206,106 @@ impl<T: BeaconChainTypes> Worker<T> {
"load_blocks_by_root_blocks",
)
}
/// Handle a `BlobsByRoot` request from the peer.
pub fn handle_blobs_by_root_request(
self,
executor: TaskExecutor,
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRootRequest,
) {
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut send_block_count = 0;
let mut send_response = true;
for root in request.block_roots.iter() {
match self
.chain
.get_block_and_blobs_checking_early_attester_cache(root)
.await
{
Ok((Some(block), Some(blobs))) => {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))),
request_id,
);
send_block_count += 1;
}
Ok((None, None)) => {
debug!(
self.log,
"Peer requested unknown block and blobs";
"peer" => %peer_id,
"request_root" => ?root
);
}
Ok((Some(_), None)) => {
debug!(
self.log,
"Peer requested block and blob, but no blob found";
"peer" => %peer_id,
"request_root" => ?root
);
}
Ok((None, Some(_))) => {
debug!(
self.log,
"Peer requested block and blob, but no block found";
"peer" => %peer_id,
"request_root" => ?root
);
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
self.log,
"Failed to fetch execution payload for block and blobs by root request";
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
send_response = false;
break;
}
Err(e) => {
debug!(
self.log,
"Error fetching block for peer";
"peer" => %peer_id,
"request_root" => ?root,
"error" => ?e,
);
}
}
}
debug!(
self.log,
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => request.block_roots.len(),
"returned" => %send_block_count
);
// send stream termination
if send_response {
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
}
drop(send_on_drop);
},
"load_blobs_by_root_blocks",
)
}
/// Handle a `BlocksByRange` request from the peer.
pub fn handle_blocks_by_range_request(

View File

@@ -68,16 +68,6 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32;
pub type Id = u32;
pub struct SeansBlock {}
pub struct SeansBlob {}
/// This is the one that has them both and goes to range.
pub struct SeansBlockBlob {
block: SeansBlock,
blob: SeansBlob,
}
/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId {