From 1ea995963254ea4e4e8cd7f1a52d27d4a1b2e1ba Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 24 Mar 2019 15:18:21 +1100 Subject: [PATCH] Fix bug with block processing in sync --- beacon_node/network/src/sync/simple_sync.rs | 104 +++++++++++++------- 1 file changed, 66 insertions(+), 38 deletions(-) diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index b77a976b1d..369564a5e9 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -425,54 +425,45 @@ impl SimpleSync { } pub fn process_import_queue(&mut self, network: &mut NetworkContext) { - let mut blocks: Vec<(usize, BeaconBlock, PeerId)> = self - .import_queue - .partials - .iter() - .enumerate() - .filter_map(|(i, partial)| { - if let Some(_) = partial.body { - let (block, _root) = partial.clone().complete().expect("Body must be Some"); - Some((i, block, partial.sender.clone())) - } else { - None - } - }) - .collect(); + let mut successful = 0; + let mut invalid = 0; + let mut errored = 0; - if !blocks.is_empty() { - info!(self.log, "Processing blocks"; "count" => blocks.len()); - } - - // Sort the blocks to be in ascending slot order. - blocks.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); - - let mut keys_to_delete = vec![]; - - for (key, block, sender) in blocks { + // Loop through all of the complete blocks in the queue. + for (queue_index, block, sender) in self.import_queue.complete_blocks() { match self.chain.process_block(block) { Ok(outcome) => { if outcome.is_invalid() { - warn!(self.log, "Invalid block: {:?}", outcome); + invalid += 1; + warn!( + self.log, + "InvalidBlock"; + "sender_peer_id" => format!("{:?}", sender), + "reason" => format!("{:?}", outcome), + ); network.disconnect(sender); - keys_to_delete.push(key) - } else { - // TODO: don't delete if was not invalid but not successfully processed. - keys_to_delete.push(key) + } + + // If this results to true, the item will be removed from the queue. + if outcome.sucessfully_processed() { + successful += 1; + self.import_queue.partials.remove(queue_index); } } Err(e) => { - error!(self.log, "Error during block processing"; "error" => format!("{:?}", e)) + errored += 1; + error!(self.log, "BlockProcessingError"; "error" => format!("{:?}", e)); } } } - if !keys_to_delete.is_empty() { - info!(self.log, "Processed {} blocks", keys_to_delete.len()); - for key in keys_to_delete { - self.import_queue.partials.remove(key); - } - } + info!( + self.log, + "ProcessBlocks"; + "invalid" => invalid, + "successful" => successful, + "errored" => errored, + ) } fn request_block_roots( @@ -557,6 +548,35 @@ impl ImportQueue { } } + /// Completes all possible partials into `BeaconBlock` and returns them, sorted by slot number. + /// Does not delete the partials from the queue, this must be done manually. + /// + /// Returns `(queue_index, block, sender)`: + /// + /// - `queue_index`: used to remove the entry if it is successfully processed. + /// - `block`: the completed block. + /// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial. + pub fn complete_blocks(&self) -> Vec<(usize, BeaconBlock, PeerId)> { + let mut completable: Vec<(usize, &PartialBeaconBlock)> = self + .partials + .iter() + .enumerate() + .filter(|(_i, partial)| partial.completable()) + .collect(); + + // Sort the completable partials to be in ascending slot order. + completable.sort_unstable_by(|a, b| a.1.header.slot.partial_cmp(&b.1.header.slot).unwrap()); + + completable + .iter() + .map(|(i, partial)| { + let (block, _root, sender) = + (*partial).clone().complete().expect("Body must be Some"); + (*i, block, sender) + }) + .collect() + } + /// Flushes all stale entries from the queue. /// /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the @@ -696,8 +716,16 @@ pub struct PartialBeaconBlock { } impl PartialBeaconBlock { + pub fn completable(&self) -> bool { + self.body.is_some() + } + /// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root. - pub fn complete(self) -> Option<(BeaconBlock, Hash256)> { - Some((self.header.into_block(self.body?), self.block_root)) + pub fn complete(self) -> Option<(BeaconBlock, Hash256, PeerId)> { + Some(( + self.header.into_block(self.body?), + self.block_root, + self.sender, + )) } }