get it to compile

This commit is contained in:
Diva M
2022-11-21 14:53:33 -05:00
parent e7ee79185b
commit 7ed2d35424
9 changed files with 106 additions and 62 deletions

View File

@@ -1334,6 +1334,11 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work)
}
Work::BlobsByRootsRequest {
peer_id,
request_id,
request,
} => todo!(),
}
}
}

View File

@@ -842,7 +842,8 @@ impl<T: BeaconChainTypes> Worker<T> {
"gossip_block_low",
);
return None;
}
}
Err(blob_errors) => unimplemented!("handle")
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);

View File

@@ -499,6 +499,7 @@ impl<T: BeaconChainTypes> Worker<T> {
//FIXME(sean) create the blobs iter
/*
let forwards_blob_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
@@ -511,12 +512,13 @@ impl<T: BeaconChainTypes> Worker<T> {
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
// return self.send_error_response(
// peer_id,
// RPCResponseErrorCode::ResourceUnavailable,
// "Backfilling".into(),
// request_id,
// );
todo!("stuff")
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
};
@@ -546,7 +548,9 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
*/
// Fetching blocks is async because it may have to hit the execution layer for payloads.
/*
executor.spawn(
async move {
let mut blocks_sent = 0;
@@ -623,5 +627,7 @@ impl<T: BeaconChainTypes> Worker<T> {
},
"load_blocks_by_range_blocks",
);
*/
unimplemented!("")
}
}

View File

@@ -210,6 +210,7 @@ impl<T: BeaconChainTypes> Processor<T> {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id,
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
@@ -268,6 +269,8 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
@@ -298,6 +301,8 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};

View File

@@ -39,7 +39,7 @@ use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::{NetworkMessage, RequestId};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
@@ -68,6 +68,17 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32;
pub type Id = u32;
#[derive(Debug)]
pub struct SeansBlob {}
#[derive(Debug)]
pub struct SeansBlock {}
#[derive(Debug)]
pub struct SeansBlockBlob {
blob: SeansBlob,
block: SeansBlock,
}
/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId {
@@ -312,8 +323,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
RequestId::RangeBlockBlob { id } => {
if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(request_id)
{
if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(id) {
self.range_sync.inject_error(
&mut self.network,
peer_id,
@@ -617,6 +627,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network),
},
SyncMessage::RpcBlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp,
} => todo!(),
SyncMessage::RpcBlockAndBlob {
request_id,
peer_id,
block_and_blobs,
seen_timestamp,
} => todo!(),
}
}
@@ -736,7 +758,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
RequestId::RangeBlockBlob { id } => {
// do stuff
self.network.block_blob_block_response(id, block);
// self.network.block_blob_block_response(id, block);
}
}
}
@@ -749,10 +771,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
let RequestId::RangeBlockBlob { id } = request_id else {
return error!("bad stuff");
panic!("Wrong things going on ");
};
// get the paired block blob from the network context and send it to range
self.network.block_blob_blob_response(request_id, blob)
}
}

View File

@@ -178,10 +178,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blocks_request,
request_id,
})
.and_then(|| {
.and_then(|_| {
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: blocks_request,
request: blobs_request,
request_id,
})
})?;
@@ -247,55 +247,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: Id,
block: Option<SeansBlock>,
) -> Option<(ChainId, BatchId, Option<SeansBlockBlob>)> {
let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?;
let response = match block {
Some(block) => match info.accumulated_blobs.pop_front() {
Some(blob) => Some(SeansBlockBlob { block, blob }),
None => {
// accumulate the block
info.accumulated_blocks.push_back(block);
None
}
},
None => {
info.is_blocks_rpc_finished = true;
if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
// this is the coupled stream termination
Some((chain_id, batch_id, None))
} else {
None
}
}
};
unimplemented!()
// let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?;
// match block {
// Some(block) => match info.accumulated_blobs.pop_front() {
// Some(blob) => Some(SeansBlockBlob { block, blob }),
// None => {
// // accumulate the block
// info.accumulated_blocks.push_back(block);
// None
// }
// },
// None => {
// info.is_blocks_rpc_finished = true;
//
// if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
// // this is the coupled stream termination
// Some((chain_id, batch_id, None))
// } else {
// None
// }
// }
// }
}
pub fn block_blob_blob_response(
&mut self,
request_id: Id,
blob: Option<SeansBlob>,
) -> Option<(ChainId, Option<SeansBlockBlob>)> {
let (chain_id, info) = self.block_blob_requests.get_mut(&request_id)?;
let response = match blob {
Some(blob) => match info.accumulated_blocks.pop_front() {
Some(block) => Some(SeansBlockBlob { block, blob }),
None => {
// accumulate the blob
info.accumulated_blobs.push_back(blob);
None
}
},
None => {
info.is_blobs_rpc_finished = true;
if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
// this is the coupled stream termination
Some((chain_id, batch_id, None))
} else {
None
}
}
};
) -> Option<(ChainId, BatchId, Option<SeansBlockBlob>)> {
// let (batch_id, chain_id, info) = self.block_blob_requests.get_mut(&request_id)?;
// match blob {
// Some(blob) => match info.accumulated_blocks.pop_front() {
// Some(block) => Some(SeansBlockBlob { block, blob }),
// None => {
// // accumulate the blob
// info.accumulated_blobs.push_back(blob);
// None
// }
// },
// None => {
// info.is_blobs_rpc_finished = true;
//
// if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
// // this is the coupled stream termination
// Some((chain_id, batch_id, None))
// } else {
// None
// }
// }
// }
unimplemented!("do it")
}
/// Received a blocks by range response.