get things compiling

This commit is contained in:
realbigsean
2023-04-20 13:38:05 -04:00
parent 374ec4800a
commit c7142495fd
12 changed files with 369 additions and 194 deletions

View File

@@ -24,6 +24,7 @@ use super::{
};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
use crate::sync::block_lookups::single_block_lookup::LookupVerifyError;
mod parent_lookup;
mod single_block_lookup;
@@ -121,14 +122,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.single_block_lookups
.iter_mut()
.any(|(block_id, blob_id, single_block_request)| {
single_block_request.add_peer(&hash, &peer_id)
if single_block_request.requested_block_root == hash {
single_block_request.block_request_state.add_peer(&peer_id);
single_block_request.blob_request_state.add_peer(&peer_id);
return true;
}
false
})
{
return;
}
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.add_block_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
}) {
// If the block was already downloaded, or is being downloaded in this moment, do not
// request it.
@@ -187,7 +193,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
self.search_block_with(
|request| {
let _ = request.add_block_wrapper(block_root, block);
let _ = request.add_block_wrapper(block_root, block.clone());
},
block_root,
peer_id,
@@ -215,8 +221,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Make sure this block is not already downloaded, and that neither it or its parent is
// being searched for.
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.contains_block(&block_root)
|| parent_req.add_block_peer(&block_root, &peer_id)
parent_req.contains_block(&block_root) || parent_req.add_peer(&block_root, &peer_id)
}) {
// we are already searching for this block, ignore it
return;
@@ -247,62 +252,57 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
let stream_terminator = block.is_none().into();
let log = self.log.clone();
let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else {
let Some((triggered_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else {
return;
};
if let Err(error) = request_ref.verify_block(block).and_then(|root_block_opt| {
if let Some((root, block)) = root_block_opt {
// Only send for processing if we don't have parent requests that were triggered by
// this block.
let triggered_parent_request = self
.parent_lookups
.iter()
.any(|lookup| lookup.chain_hash() == root);
let should_remove = match request_ref.verify_block(block) {
Ok(Some((root, block))) => {
if triggered_parent_request {
// The lookup status here is irrelevant because we wait until the parent chain
// is complete before processing the block.
let _ = request_ref.add_block(root, block)?;
if let Err(e) = request_ref.add_block(root, block) {
Self::handle_block_lookup_verify_error(
id,
peer_id,
cx,
request_id_ref,
request_ref,
e,
&log,
)
} else {
false
}
} else {
// This is the correct block, send it for processing
if self
.send_block_for_processing(
root,
BlockWrapper::Block(block),
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
{
// Remove to avoid inconsistencies
self.single_block_lookups
.retain(|(block_id, _, _)| block_id != &Some(id));
}
self.send_block_for_processing(
root,
BlockWrapper::Block(block),
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
}
}
Ok(None) => false,
Err(e) => Self::handle_blob_lookup_verify_error(
id,
peer_id,
cx,
request_id_ref,
request_ref,
e,
&log,
),
};
Ok(())
}) {
let msg: &str = error.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
debug!(self.log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root);
// try the request again if possible
if let Ok((peer_id, request)) = request_ref.request_block() {
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
*request_id_ref = id;
} else {
self.single_block_lookups
.retain(|(block_id, _, _)| block_id != &Some(id));
}
} else {
self.single_block_lookups
.retain(|(block_id, _, _)| block_id != &Some(id));
}
if should_remove {
self.single_block_lookups
.retain(|(block_id, _, _)| block_id != &Some(id));
}
metrics::set_gauge(
@@ -321,60 +321,58 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
let stream_terminator = blob.is_none().into();
let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else {
let log = self.log.clone();
let Some((triggered_parent_request, request_id_ref, request_ref)) =
self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else {
return;
};
if let Err(error) = request_ref.verify_blob(blob).and_then(|root_blobs_opt| {
if let Some((block_root, blobs)) = root_blobs_opt {
// Only send for processing if we don't have parent requests that were triggered by
// this block.
let triggered_parent_request = self
.parent_lookups
.iter()
.any(|lookup| lookup.chain_hash() == block_root);
let should_remove = match request_ref.verify_blob(blob) {
Ok(Some((block_root, blobs))) => {
if triggered_parent_request {
// The lookup status here is irrelevant because we wait until the parent chain
// is complete before processing the block.
let _ = request_ref.add_blobs(block_root, blobs)?;
if let Err(e) = request_ref.add_blobs(block_root, blobs) {
Self::handle_blob_lookup_verify_error(
id,
peer_id,
cx,
request_id_ref,
request_ref,
e,
&log,
)
} else {
false
}
} else {
// These are the correct blobs, send them for processing
if self
.send_blobs_for_processing(
block_root,
blobs,
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
{
// Remove to avoid inconsistencies
self.single_block_lookups
.retain(|(_, blob_id, _)| blob_id != &Some(id));
}
self.send_blobs_for_processing(
block_root,
blobs,
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
}
}
Ok(())
}) {
let msg: &str = error.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
Ok(None) => false,
Err(e) => Self::handle_blob_lookup_verify_error(
id,
peer_id,
cx,
request_id_ref,
request_ref,
e,
&log,
),
};
debug!(self.log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root);
// try the request again if possible
if let Ok((peer_id, request)) = request_ref.request_blobs() {
if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) {
*request_id_ref = id;
} else {
self.single_block_lookups
.retain(|(_, blob_id, _)| blob_id != &Some(id));
}
} else {
self.single_block_lookups
.retain(|(_, blob_id, _)| blob_id != &Some(id));
}
if should_remove {
self.single_block_lookups
.retain(|(_, blob_id, _)| blob_id != &Some(id));
}
metrics::set_gauge(
@@ -383,16 +381,89 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
//TODO(sean) reduce duplicate code
fn handle_block_lookup_verify_error(
id: Id,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
request_id_ref: &mut Id,
request_ref: &mut SingleBlockLookup<3, T>,
error: LookupVerifyError,
log: &Logger,
) -> bool {
let requested_block_root = request_ref.requested_block_root;
let msg: &str = error.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
debug!(log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => ?requested_block_root);
// try the request again if possible
match request_ref.request_block() {
Ok(Some((peer_id, request))) => {
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
*request_id_ref = id;
} else {
return true;
}
}
Ok(None) => {}
Err(e) => {
debug!(log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => ?e, "block_root" => %requested_block_root);
return true;
}
}
false
}
fn handle_blob_lookup_verify_error(
id: Id,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
request_id_ref: &mut Id,
request_ref: &mut SingleBlockLookup<3, T>,
error: LookupVerifyError,
log: &Logger,
) -> bool {
let requested_block_root = request_ref.requested_block_root;
let msg: &str = error.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
debug!(log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => ?requested_block_root);
// try the request again if possible
match request_ref.request_blobs() {
Ok(Some((peer_id, request))) => {
if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) {
*request_id_ref = id;
} else {
return true;
}
}
Ok(None) => {}
Err(e) => {
debug!(log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => ?e, "block_root" => %requested_block_root);
return true;
}
}
false
}
fn find_single_lookup_request(
&mut self,
target_id: Id,
stream_terminator: StreamTerminator,
response_type: ResponseType,
) -> Option<(
bool,
&mut Id,
&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
)> {
let lookup: Option<(
bool,
&mut Id,
&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
)> = self
@@ -404,27 +475,36 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
ResponseType::Blob => blob_id_opt,
};
if let Some(lookup_id) = id_opt {
if lookup_id == target_id {
Some((lookup_id, req))
if *lookup_id == target_id {
// Only send for processing if we don't have parent requests that were triggered by
// this block.
let triggered_parent_request = self
.parent_lookups
.iter()
.any(|lookup| lookup.chain_hash() == req.requested_block_root);
return Some((triggered_parent_request, lookup_id, req));
}
}
None
});
let (id_ref, request) = match lookup {
Some((id_ref, req)) => (id_ref, req),
let (triggered_parent_request, id_ref, request) = match lookup {
Some((triggered_parent_request, id_ref, req)) => {
(triggered_parent_request, id_ref, req)
}
None => {
if matches!(StreamTerminator::False, stream_terminator) {
debug!(
self.log,
"Block returned for single block lookup not present";
"response_type" => response_type,
"response_type" => ?response_type,
);
}
return None;
}
};
Some((id_ref, request))
Some((triggered_parent_request, id_ref, request))
}
/// Process a response received from a parent lookup request.
@@ -451,7 +531,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_block(block, &mut self.failed_chains) {
Ok(Some((block_root, block))) => {
let process_or_search = parent_lookup.add_block(block_root, block);
let process_or_search = parent_lookup.add_block(block_root, block).unwrap(); //TODO(sean) fix
match process_or_search {
LookupDownloadStatus::Process(wrapper) => {
let chain_hash = parent_lookup.chain_hash();
@@ -486,7 +566,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_)
| ParentVerifyError::AvailabilityCheck(_) => {
| ParentVerifyError::AvailabilityCheck => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
@@ -545,7 +625,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_blob(blob, &mut self.failed_chains) {
Ok(Some((block_root, blobs))) => {
let processed_or_search = parent_lookup.add_blobs(block_root, blobs);
let processed_or_search = parent_lookup.add_blobs(block_root, blobs).unwrap(); //TODO(sean) fix
match processed_or_search {
LookupDownloadStatus::Process(wrapper) => {
let chain_hash = parent_lookup.chain_hash();
@@ -573,14 +653,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// processing result arrives.
self.parent_lookups.push(parent_lookup);
}
Err(e) => match e.into() {
Err(e) => match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_)
| ParentVerifyError::AvailabilityCheck(_) => {
| ParentVerifyError::AvailabilityCheck => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
@@ -1127,25 +1207,35 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.enumerate()
.find(|(index, (_, _, req))| req.requested_block_root == chain_hash)
{
self.single_block_lookups
if let Some((block_id, blob_id, block_wrapper)) = self
.single_block_lookups
.get_mut(index)
.and_then(|(_, _, lookup)| lookup.get_downloaded_block())
.map(|block_wrapper| {
// This is the correct block, send it for processing
if self
.send_block_for_processing(
chain_hash,
block_wrapper,
Duration::from_secs(0), //TODO(sean) pipe this through
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
{
// Remove to avoid inconsistencies
self.single_block_lookups.remove(index);
}
});
.and_then(|(block_id, blob_id, lookup)| {
lookup
.get_downloaded_block()
.map(|block| (block_id, blob_id, block))
})
{
let Some(id) = block_id.or(*blob_id) else {
warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash);
return;
};
// This is the correct block, send it for processing
if self
.send_block_for_processing(
chain_hash,
block_wrapper,
Duration::from_secs(0), //TODO(sean) pipe this through
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
{
// Remove to avoid inconsistencies
self.single_block_lookups.remove(index);
}
}
}
}
BatchProcessResult::FaultyFailure {