Pass vec to range sync batch (#5710)

* Pass vec to range sync batch
This commit is contained in:
Lion - dapplion
2024-06-27 20:21:40 +02:00
committed by GitHub
parent f14f21f37b
commit 784ef5fb43
5 changed files with 46 additions and 85 deletions

View File

@@ -369,7 +369,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
beacon_block: Option<RpcBlock<T::EthSpec>>, blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<ProcessResult, BackFillError> { ) -> Result<ProcessResult, BackFillError> {
// check if we have this batch // check if we have this batch
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
@@ -392,20 +392,14 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
} }
}; };
if let Some(block) = beacon_block { {
// This is not a stream termination, simply add the block to the request
if let Err(e) = batch.add_block(block) {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
Ok(ProcessResult::Successful)
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch. // A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches // Remove the request from the peer's active batches
self.active_requests self.active_requests
.get_mut(peer_id) .get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id)); .map(|active_requests| active_requests.remove(&batch_id));
match batch.download_completed() { match batch.download_completed(blocks) {
Ok(received) => { Ok(received) => {
let awaiting_batches = let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;

View File

@@ -908,39 +908,32 @@ impl<T: BeaconChainTypes> SyncManager<T> {
{ {
match resp.responses { match resp.responses {
Ok(blocks) => { Ok(blocks) => {
for block in blocks match resp.sender_id {
.into_iter() RangeRequestId::RangeSync { chain_id, batch_id } => {
.map(Some) self.range_sync.blocks_by_range_response(
// chain the stream terminator &mut self.network,
.chain(vec![None]) peer_id,
{ chain_id,
match resp.sender_id { batch_id,
RangeRequestId::RangeSync { chain_id, batch_id } => { id,
self.range_sync.blocks_by_range_response( blocks,
&mut self.network, );
peer_id, self.update_sync_state();
chain_id, }
batch_id, RangeRequestId::BackfillSync { batch_id } => {
id, match self.backfill_sync.on_block_response(
block, &mut self.network,
); batch_id,
self.update_sync_state(); &peer_id,
} id,
RangeRequestId::BackfillSync { batch_id } => { blocks,
match self.backfill_sync.on_block_response( ) {
&mut self.network, Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
batch_id, Ok(ProcessResult::Successful) => {}
&peer_id, Err(_error) => {
id, // The backfill sync has failed, errors are reported
block, // within.
) { self.update_sync_state();
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
} }
} }
} }

View File

@@ -116,7 +116,7 @@ pub enum BatchState<E: EthSpec> {
/// The batch has failed either downloading or processing, but can be requested again. /// The batch has failed either downloading or processing, but can be requested again.
AwaitingDownload, AwaitingDownload,
/// The batch is being downloaded. /// The batch is being downloaded.
Downloading(PeerId, Vec<RpcBlock<E>>, Id), Downloading(PeerId, Id),
/// The batch has been completely downloaded and is ready for processing. /// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>), AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
/// The batch is being processed. /// The batch is being processed.
@@ -199,7 +199,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
/// Verifies if an incoming block belongs to this batch. /// Verifies if an incoming block belongs to this batch.
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool {
if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { if let BatchState::Downloading(expected_peer, expected_id) = &self.state {
return peer_id == expected_peer && expected_id == request_id; return peer_id == expected_peer && expected_id == request_id;
} }
false false
@@ -209,7 +209,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
pub fn current_peer(&self) -> Option<&PeerId> { pub fn current_peer(&self) -> Option<&PeerId> {
match &self.state { match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None, BatchState::AwaitingDownload | BatchState::Failed => None,
BatchState::Downloading(peer_id, _, _) BatchState::Downloading(peer_id, _)
| BatchState::AwaitingProcessing(peer_id, _) | BatchState::AwaitingProcessing(peer_id, _)
| BatchState::Processing(Attempt { peer_id, .. }) | BatchState::Processing(Attempt { peer_id, .. })
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
@@ -250,36 +250,18 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
&self.failed_processing_attempts &self.failed_processing_attempts
} }
/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: RpcBlock<E>) -> Result<(), WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
self.state = BatchState::Downloading(peer, blocks, req_id);
Ok(())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
self.state = other;
Err(WrongState(format!(
"Add block for batch in wrong state {:?}",
self.state
)))
}
}
}
/// Marks the batch as ready to be processed if the blocks are in the range. The number of /// Marks the batch as ready to be processed if the blocks are in the range. The number of
/// received blocks is returned, or the wrong batch end on failure /// received blocks is returned, or the wrong batch end on failure
#[must_use = "Batch may have failed"] #[must_use = "Batch may have failed"]
pub fn download_completed( pub fn download_completed(
&mut self, &mut self,
blocks: Vec<RpcBlock<E>>,
) -> Result< ) -> Result<
usize, /* Received blocks */ usize, /* Received blocks */
Result<(Slot, Slot, BatchOperationOutcome), WrongState>, Result<(Slot, Slot, BatchOperationOutcome), WrongState>,
> { > {
match self.state.poison() { match self.state.poison() {
BatchState::Downloading(peer, blocks, _request_id) => { BatchState::Downloading(peer, _request_id) => {
// verify that blocks are in range // verify that blocks are in range
if let Some(last_slot) = blocks.last().map(|b| b.slot()) { if let Some(last_slot) = blocks.last().map(|b| b.slot()) {
// the batch is non-empty // the batch is non-empty
@@ -336,7 +318,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
mark_failed: bool, mark_failed: bool,
) -> Result<BatchOperationOutcome, WrongState> { ) -> Result<BatchOperationOutcome, WrongState> {
match self.state.poison() { match self.state.poison() {
BatchState::Downloading(peer, _, _request_id) => { BatchState::Downloading(peer, _request_id) => {
// register the attempt and check if the batch can be tried again // register the attempt and check if the batch can be tried again
if mark_failed { if mark_failed {
self.failed_download_attempts.push(peer); self.failed_download_attempts.push(peer);
@@ -369,7 +351,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
) -> Result<(), WrongState> { ) -> Result<(), WrongState> {
match self.state.poison() { match self.state.poison() {
BatchState::AwaitingDownload => { BatchState::AwaitingDownload => {
self.state = BatchState::Downloading(peer, Vec::new(), request_id); self.state = BatchState::Downloading(peer, request_id);
Ok(()) Ok(())
} }
BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -536,13 +518,9 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
BatchState::AwaitingProcessing(ref peer, ref blocks) => { BatchState::AwaitingProcessing(ref peer, ref blocks) => {
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
} }
BatchState::Downloading(peer, blocks, request_id) => write!( BatchState::Downloading(peer, request_id) => {
f, write!(f, "Downloading({}, {})", peer, request_id)
"Downloading({}, {} blocks, {})", }
peer,
blocks.len(),
request_id
),
BatchState::Poisoned => f.write_str("Poisoned"), BatchState::Poisoned => f.write_str("Poisoned"),
} }
} }

View File

@@ -200,7 +200,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
beacon_block: Option<RpcBlock<T::EthSpec>>, blocks: Vec<RpcBlock<T::EthSpec>>,
) -> ProcessingResult { ) -> ProcessingResult {
// check if we have this batch // check if we have this batch
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
@@ -221,18 +221,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
} }
}; };
if let Some(block) = beacon_block { {
// This is not a stream termination, simply add the block to the request
batch.add_block(block)?;
Ok(KeepChain)
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch. // A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches // Remove the request from the peer's active batches
self.peers self.peers
.get_mut(peer_id) .get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id)); .map(|active_requests| active_requests.remove(&batch_id));
match batch.download_completed() { match batch.download_completed(blocks) {
Ok(received) => { Ok(received) => {
let awaiting_batches = batch_id let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))

View File

@@ -210,11 +210,11 @@ where
chain_id: ChainId, chain_id: ChainId,
batch_id: BatchId, batch_id: BatchId,
request_id: Id, request_id: Id,
beacon_block: Option<RpcBlock<T::EthSpec>>, blocks: Vec<RpcBlock<T::EthSpec>>,
) { ) {
// check if this chunk removes the chain // check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| { match self.chains.call_by_id(chain_id, |chain| {
chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) chain.on_block_response(network, batch_id, &peer_id, request_id, blocks)
}) { }) {
Ok((removed_chain, sync_type)) => { Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain { if let Some((removed_chain, remove_reason)) = removed_chain {
@@ -795,7 +795,7 @@ mod tests {
rig.cx.update_execution_engine_state(EngineState::Offline); rig.cx.update_execution_engine_state(EngineState::Offline);
// send the response to the request // send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, None); range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]);
// the beacon processor shouldn't have received any work // the beacon processor shouldn't have received any work
rig.expect_empty_processor(); rig.expect_empty_processor();
@@ -809,7 +809,7 @@ mod tests {
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
// send the response to the request // send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None); range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]);
// the beacon processor shouldn't have received any work // the beacon processor shouldn't have received any work
rig.expect_empty_processor(); rig.expect_empty_processor();