mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 19:51:47 +00:00
Some sync/backfill format nits (#6861)
When working on unrelated changes I noted: - An unnecessary closure left by a commit of some guy named @dapplion that can be removed - match statements that can be simplified with the new let else syntax - instead of mapping a result to ignore the Ok value, return
This commit is contained in:
@@ -388,67 +388,59 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||
) -> Result<ProcessResult, BackFillError> {
|
||||
// check if we have this batch
|
||||
let batch = match self.batches.get_mut(&batch_id) {
|
||||
None => {
|
||||
if !matches!(self.state(), BackFillState::Failed) {
|
||||
// A batch might get removed when the chain advances, so this is non fatal.
|
||||
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
|
||||
}
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
Some(batch) => {
|
||||
// A batch could be retried without the peer failing the request (disconnecting/
|
||||
// sending an error /timeout) if the peer is removed from the chain for other
|
||||
// reasons. Check that this block belongs to the expected peer, and that the
|
||||
// request_id matches
|
||||
// TODO(das): removed peer_id matching as the node may request a different peer for data
|
||||
// columns.
|
||||
if !batch.is_expecting_block(&request_id) {
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
batch
|
||||
let Some(batch) = self.batches.get_mut(&batch_id) else {
|
||||
if !matches!(self.state(), BackFillState::Failed) {
|
||||
// A batch might get removed when the chain advances, so this is non fatal.
|
||||
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
|
||||
}
|
||||
return Ok(ProcessResult::Successful);
|
||||
};
|
||||
|
||||
{
|
||||
// A stream termination has been sent. This batch has ended. Process a completed batch.
|
||||
// Remove the request from the peer's active batches
|
||||
self.active_requests
|
||||
.get_mut(peer_id)
|
||||
.map(|active_requests| active_requests.remove(&batch_id));
|
||||
// A batch could be retried without the peer failing the request (disconnecting/
|
||||
// sending an error /timeout) if the peer is removed from the chain for other
|
||||
// reasons. Check that this block belongs to the expected peer, and that the
|
||||
// request_id matches
|
||||
// TODO(das): removed peer_id matching as the node may request a different peer for data
|
||||
// columns.
|
||||
if !batch.is_expecting_block(&request_id) {
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
|
||||
match batch.download_completed(blocks) {
|
||||
Ok(received) => {
|
||||
let awaiting_batches =
|
||||
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
|
||||
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
|
||||
// A stream termination has been sent. This batch has ended. Process a completed batch.
|
||||
// Remove the request from the peer's active batches
|
||||
self.active_requests
|
||||
.get_mut(peer_id)
|
||||
.map(|active_requests| active_requests.remove(&batch_id));
|
||||
|
||||
// pre-emptively request more blocks from peers whilst we process current blocks,
|
||||
self.request_batches(network)?;
|
||||
self.process_completed_batches(network)
|
||||
}
|
||||
Err(result) => {
|
||||
let (expected_boundary, received_boundary, outcome) = match result {
|
||||
Err(e) => {
|
||||
return self
|
||||
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
||||
.map(|_| ProcessResult::Successful);
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
|
||||
match batch.download_completed(blocks) {
|
||||
Ok(received) => {
|
||||
let awaiting_batches =
|
||||
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
|
||||
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
|
||||
|
||||
// pre-emptively request more blocks from peers whilst we process current blocks,
|
||||
self.request_batches(network)?;
|
||||
self.process_completed_batches(network)
|
||||
}
|
||||
Err(result) => {
|
||||
let (expected_boundary, received_boundary, outcome) = match result {
|
||||
Err(e) => {
|
||||
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
|
||||
"peer_id" => %peer_id, batch);
|
||||
|
||||
if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
|
||||
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
|
||||
return self
|
||||
.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
|
||||
.map(|_| ProcessResult::Successful);
|
||||
}
|
||||
// this batch can't be used, so we need to request it again.
|
||||
self.retry_batch_download(network, batch_id)
|
||||
.map(|_| ProcessResult::Successful)
|
||||
if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
|
||||
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
|
||||
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?;
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
// this batch can't be used, so we need to request it again.
|
||||
self.retry_batch_download(network, batch_id)?;
|
||||
Ok(ProcessResult::Successful)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -582,20 +574,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
}
|
||||
};
|
||||
|
||||
let peer = match batch.current_peer() {
|
||||
Some(v) => *v,
|
||||
None => {
|
||||
return self
|
||||
.fail_sync(BackFillError::BatchInvalidState(
|
||||
batch_id,
|
||||
String::from("Peer does not exist"),
|
||||
))
|
||||
.map(|_| ProcessResult::Successful)
|
||||
}
|
||||
let Some(peer) = batch.current_peer() else {
|
||||
self.fail_sync(BackFillError::BatchInvalidState(
|
||||
batch_id,
|
||||
String::from("Peer does not exist"),
|
||||
))?;
|
||||
return Ok(ProcessResult::Successful);
|
||||
};
|
||||
|
||||
debug!(self.log, "Backfill batch processed"; "result" => ?result, &batch,
|
||||
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
|
||||
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(peer));
|
||||
|
||||
match result {
|
||||
BatchProcessResult::Success {
|
||||
@@ -679,8 +667,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
{
|
||||
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
||||
}
|
||||
self.retry_batch_download(network, batch_id)
|
||||
.map(|_| ProcessResult::Successful)
|
||||
self.retry_batch_download(network, batch_id)?;
|
||||
Ok(ProcessResult::Successful)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -712,11 +700,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
// - AwaitingDownload -> A recoverable failed batch should have been
|
||||
// re-requested.
|
||||
// - Processing -> `self.current_processing_batch` is None
|
||||
return self
|
||||
.fail_sync(BackFillError::InvalidSyncState(String::from(
|
||||
"Invalid expected batch state",
|
||||
)))
|
||||
.map(|_| ProcessResult::Successful);
|
||||
self.fail_sync(BackFillError::InvalidSyncState(String::from(
|
||||
"Invalid expected batch state",
|
||||
)))?;
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
BatchState::AwaitingValidation(_) => {
|
||||
// TODO: I don't think this state is possible, log a CRIT just in case.
|
||||
@@ -731,12 +718,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return self
|
||||
.fail_sync(BackFillError::InvalidSyncState(format!(
|
||||
"Batch not found for current processing target {}",
|
||||
self.processing_target
|
||||
)))
|
||||
.map(|_| ProcessResult::Successful);
|
||||
self.fail_sync(BackFillError::InvalidSyncState(format!(
|
||||
"Batch not found for current processing target {}",
|
||||
self.processing_target
|
||||
)))?;
|
||||
return Ok(ProcessResult::Successful);
|
||||
}
|
||||
Ok(ProcessResult::Successful)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user