add blobs cache and fix some block production

This commit is contained in:
realbigsean
2022-11-21 14:09:06 -05:00
parent dc87156641
commit e7ee79185b
17 changed files with 315 additions and 189 deletions

View File

@@ -1729,7 +1729,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request_id,
request,
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
worker.handle_blocks_by_root_request(
worker.handle_blobs_by_root_request(
sub_executor,
send_idle_on_drop,
peer_id,

View File

@@ -12,6 +12,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error};
use slot_clock::SlotClock;
use ssz_types::VariableList;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot};
@@ -498,129 +499,129 @@ impl<T: BeaconChainTypes> Worker<T> {
//FIXME(sean) create the blobs iter
// let forwards_block_root_iter = match self
// .chain
// .forwards_iter_block_roots(Slot::from(req.start_slot))
// {
// Ok(iter) => iter,
// Err(BeaconChainError::HistoricalBlockError(
// HistoricalBlockError::BlockOutOfRange {
// slot,
// oldest_block_slot,
// },
// )) => {
// debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
// return self.send_error_response(
// peer_id,
// RPCResponseErrorCode::ResourceUnavailable,
// "Backfilling".into(),
// request_id,
// );
// }
// Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
// };
//
// // Pick out the required blocks, ignoring skip-slots.
// let mut last_block_root = None;
// let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
// iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// // map skip slots to None
// .map(|(root, _)| {
// let result = if Some(root) == last_block_root {
// None
// } else {
// Some(root)
// };
// last_block_root = Some(root);
// result
// })
// .collect::<Vec<Option<Hash256>>>()
// });
//
// let block_roots = match maybe_block_roots {
// Ok(block_roots) => block_roots,
// Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
// };
//
// // remove all skip slots
// let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
//
// // Fetching blocks is async because it may have to hit the execution layer for payloads.
// executor.spawn(
// async move {
// let mut blocks_sent = 0;
// let mut send_response = true;
//
// for root in block_roots {
// match self.chain.store.get_blobs(&root) {
// Ok(Some(blob)) => {
// blocks_sent += 1;
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))),
// id: request_id,
// });
// }
// Ok(None) => {
// error!(
// self.log,
// "Blob in the chain is not in the store";
// "request_root" => ?root
// );
// break;
// }
// Err(e) => {
// error!(
// self.log,
// "Error fetching block for peer";
// "block_root" => ?root,
// "error" => ?e
// );
// break;
// }
// }
// }
//
// let current_slot = self
// .chain
// .slot()
// .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
//
// if blocks_sent < (req.count as usize) {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "msg" => "Failed to return all requested blocks",
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// } else {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// }
//
// if send_response {
// // send the stream terminator
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(None),
// id: request_id,
// });
// }
//
// drop(send_on_drop);
// },
// "load_blocks_by_range_blocks",
// );
let forwards_blob_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
};
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.store.get_blobs(&root) {
Ok(Some(blob)) => {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(blob))),
id: request_id,
});
}
Ok(None) => {
error!(
self.log,
"Blob in the chain is not in the store";
"request_root" => ?root
);
break;
}
Err(e) => {
error!(
self.log,
"Error fetching blob for peer";
"block_root" => ?root,
"error" => ?e
);
break;
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blobs_sent < (req.count as usize) {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
} else {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(None),
id: request_id,
});
}
drop(send_on_drop);
},
"load_blocks_by_range_blocks",
);
}
}

View File

@@ -232,7 +232,7 @@ impl<T: BeaconChainTypes> Processor<T> {
&mut self,
peer_id: PeerId,
request_id: RequestId,
blob_wrapper: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>,
blob_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
) {
trace!(
self.log,
@@ -244,7 +244,7 @@ impl<T: BeaconChainTypes> Processor<T> {
self.send_to_sync(SyncMessage::RpcBlob {
peer_id,
request_id: id,
blob_sidecar: blob_wrapper,
blob_sidecar,
seen_timestamp: timestamp_now(),
});
} else {
@@ -285,6 +285,36 @@ impl<T: BeaconChainTypes> Processor<T> {
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
trace!(
self.log,
"Received BlockAndBlobssByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlockAndBlob {
peer_id,
request_id,
block_and_blobs,
seen_timestamp: timestamp_now(),
});
}
/// Process a gossip message declaring a new block.
///
/// Attempts to apply to block to the beacon chain. May queue the block for later processing.

View File

@@ -97,6 +97,22 @@ pub enum SyncMessage<T: EthSpec> {
seen_timestamp: Duration,
},
/// A blob has been received from the RPC.
RpcBlob {
request_id: RequestId,
peer_id: PeerId,
blob_sidecar: Option<Arc<BlobsSidecar<T>>>,
seen_timestamp: Duration,
},
/// A block and blobs have been received from the RPC.
RpcBlockAndBlob {
request_id: RequestId,
peer_id: PeerId,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T>>>,
seen_timestamp: Duration,
},
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, Arc<SignedBeaconBlock<T>>, Hash256),
@@ -729,7 +745,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&mut self,
request_id: RequestId,
peer_id: PeerId,
beacon_block: Option<SeansBlob>,
beacon_block: Option<Arc<BlobsSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
let RequestId::RangeBlockBlob { id } = request_id else {