This commit is contained in:
realbigsean
2022-10-03 21:48:02 -04:00
parent 88006735c4
commit e81dbbfea4
46 changed files with 373 additions and 571 deletions

View File

@@ -570,21 +570,6 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
pub fn tx_blob_by_range_request(
peer_id: PeerId,
request_id: PeerRequestId,
request: TxBlobsByRangeRequest,
) -> Self {
Self {
drop_during_sync: false,
work: Work::TxBlobsByRangeRequest {
peer_id,
request_id,
request,
},
}
}
/// Create a new work event to process `BlocksByRootRequest`s from the RPC network.
pub fn blocks_by_roots_request(
peer_id: PeerId,
@@ -741,13 +726,6 @@ pub enum Work<T: BeaconChainTypes> {
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipBlob {
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blob: Box<BlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
DelayedImportBlock {
peer_id: PeerId,
block: Box<GossipVerifiedBlock<T>>,
@@ -801,11 +779,6 @@ pub enum Work<T: BeaconChainTypes> {
request_id: PeerRequestId,
request: BlocksByRangeRequest,
},
TxBlobsByRangeRequest {
peer_id: PeerId,
request_id: PeerRequestId,
request: TxBlobsByRangeRequest,
},
BlocksByRootsRequest {
peer_id: PeerId,
request_id: PeerRequestId,
@@ -838,7 +811,6 @@ impl<T: BeaconChainTypes> Work<T> {
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::Status { .. } => STATUS_PROCESSING,
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
Work::TxBlobsByRangeRequest { .. } => TX_BLOBS_BY_RANGE_REQUEST,
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest {..} => BLOBS_BY_RANGE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
@@ -1090,7 +1062,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, toolbox);
//FIXME(sean)
} else if let Some(item) = gossip_blob_queue.pop() {
} else if let Some(item) = gossip_blobs_sidecar_queue.pop() {
self.spawn_worker(item, toolbox);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
@@ -1331,9 +1303,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::BlocksByRangeRequest { .. } => {
bbrange_queue.push(work, work_id, &self.log)
}
Work::TxBlobsByRangeRequest { .. } => {
txbbrange_queue.push(work, work_id, &self.log)
}
Work::BlocksByRootsRequest { .. } => {
bbroots_queue.push(work, work_id, &self.log)
}
@@ -1571,7 +1540,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_blobs_sidecar(
.process_gossip_blob(
message_id,
peer_id,
peer_client,

View File

@@ -695,12 +695,12 @@ impl<T: BeaconChainTypes> Worker<T> {
}
#[allow(clippy::too_many_arguments)]
pub fn process_gossip_blob(
pub async fn process_gossip_blob(
self,
_message_id: MessageId,
_peer_id: PeerId,
_peer_client: Client,
_blob: BlobsSidecar<T::EthSpec>,
_blob: Arc<SignedBlobsSidecar<T::EthSpec>>,
_reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
_duplicate_cache: DuplicateCache,
_seen_duration: Duration,
@@ -951,22 +951,6 @@ impl<T: BeaconChainTypes> Worker<T> {
let block: Arc<_> = verified_block.block.clone();
let block_root = verified_block.block_root;
let sidecar = if verified_block.block.message()
.body().blob_kzg_commitments().map(|committments| committments.is_empty()).unwrap_or(true) {
None
} else if let Some(sidecar) = self.chain.sidecar_waiting_for_block.lock().as_ref() {
if sidecar.message.beacon_block_root == verified_block.block_root() {
Some(sidecar.clone())
} else {
*self.chain.block_waiting_for_sidecar.lock() = Some(verified_block);
return
}
} else {
*self.chain.block_waiting_for_sidecar.lock() = Some(verified_block);
// we need the sidecar but dont have it yet
return
};
match self
.chain
.process_block(block_root, verified_block, CountUnrealized::True)
@@ -1012,7 +996,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Failed to verify execution payload";
"error" => %e
);
}
},
other => {
debug!(
self.log,
@@ -1034,10 +1018,6 @@ impl<T: BeaconChainTypes> Worker<T> {
}
};
}
} else {
*self.chain.sidecar_waiting_for_block.lock() = Some(blobs);
}
}
pub fn process_gossip_voluntary_exit(
self,

View File

@@ -6,10 +6,9 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, Whe
use itertools::process_results;
use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS};
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error};
use lighthouse_network::rpc::methods::TxBlobsByRangeRequest;
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
@@ -124,15 +123,6 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
pub fn handle_tx_blobs_by_range_request(
&self,
_peer_id: PeerId,
_request_id: PeerRequestId,
_req: TxBlobsByRangeRequest,
) {
//FIXME(sean)
}
/// Handle a `BlocksByRoot` request from the peer.
pub fn handle_blocks_by_root_request(
self,
@@ -400,133 +390,135 @@ impl<T: BeaconChainTypes> Worker<T> {
);
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
if req.count > MAX_REQUEST_BLOBS_SIDECARS {
req.count = MAX_REQUEST_BLOBS_SIDECARS;
}
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
};
//FIXME(sean) create the blobs iter
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.store.get_blobs(&root) {
Ok(Some(blob)) => {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))),
id: request_id,
});
}
Ok(None) => {
error!(
self.log,
"Blob in the chain is not in the store";
"request_root" => ?root
);
break;
}
Err(e) => {
error!(
self.log,
"Error fetching block for peer";
"block_root" => ?root,
"error" => ?e
);
break;
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
debug!(
self.log,
"BlocksByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent
);
} else {
debug!(
self.log,
"BlocksByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(None),
id: request_id,
});
}
drop(send_on_drop);
},
"load_blocks_by_range_blocks",
);
// let forwards_block_root_iter = match self
// .chain
// .forwards_iter_block_roots(Slot::from(req.start_slot))
// {
// Ok(iter) => iter,
// Err(BeaconChainError::HistoricalBlockError(
// HistoricalBlockError::BlockOutOfRange {
// slot,
// oldest_block_slot,
// },
// )) => {
// debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
// return self.send_error_response(
// peer_id,
// RPCResponseErrorCode::ResourceUnavailable,
// "Backfilling".into(),
// request_id,
// );
// }
// Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
// };
//
// // Pick out the required blocks, ignoring skip-slots.
// let mut last_block_root = None;
// let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
// iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// // map skip slots to None
// .map(|(root, _)| {
// let result = if Some(root) == last_block_root {
// None
// } else {
// Some(root)
// };
// last_block_root = Some(root);
// result
// })
// .collect::<Vec<Option<Hash256>>>()
// });
//
// let block_roots = match maybe_block_roots {
// Ok(block_roots) => block_roots,
// Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
// };
//
// // remove all skip slots
// let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
//
// // Fetching blocks is async because it may have to hit the execution layer for payloads.
// executor.spawn(
// async move {
// let mut blocks_sent = 0;
// let mut send_response = true;
//
// for root in block_roots {
// match self.chain.store.get_blobs(&root) {
// Ok(Some(blob)) => {
// blocks_sent += 1;
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))),
// id: request_id,
// });
// }
// Ok(None) => {
// error!(
// self.log,
// "Blob in the chain is not in the store";
// "request_root" => ?root
// );
// break;
// }
// Err(e) => {
// error!(
// self.log,
// "Error fetching block for peer";
// "block_root" => ?root,
// "error" => ?e
// );
// break;
// }
// }
// }
//
// let current_slot = self
// .chain
// .slot()
// .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
//
// if blocks_sent < (req.count as usize) {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "msg" => "Failed to return all requested blocks",
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// } else {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// }
//
// if send_response {
// // send the stream terminator
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(None),
// id: request_id,
// });
// }
//
// drop(send_on_drop);
// },
// "load_blocks_by_range_blocks",
// );
}
}