Network crate update to stable futures

This commit is contained in:
Age Manning
2020-05-07 19:38:25 +10:00
parent d00fbcefd2
commit 1f2acad0df
8 changed files with 302 additions and 350 deletions

View File

@@ -5,13 +5,15 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{types::GossipKind, MessageId, NetworkGlobals, PeerId};
use futures::prelude::*;
use hashmap_delay::HashSetDelay;
use hashset_delay::HashSetDelay;
use rand::seq::SliceRandom;
use rest_types::ValidatorSubscription;
use slog::{crit, debug, error, o, warn};
use slot_clock::SlotClock;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use types::{Attestation, EthSpec, Slot, SubnetId};
@@ -609,57 +611,64 @@ impl<T: BeaconChainTypes> AttestationService<T> {
impl<T: BeaconChainTypes> Stream for AttestationService<T> {
type Item = AttServiceMessage;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// process any peer discovery events
while let Async::Ready(Some(exact_subnet)) =
self.discover_peers.poll().map_err(|e| {
error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e));
})?
{
self.handle_discover_peers(exact_subnet);
}
match self.discover_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any subscription events
while let Async::Ready(Some(exact_subnet)) = self.subscriptions.poll().map_err(|e| {
error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e));
})?
{
self.handle_subscriptions(exact_subnet);
}
match self.subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any un-subscription events
while let Async::Ready(Some(exact_subnet)) = self.unsubscriptions.poll().map_err(|e| {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e));
})?
{
self.handle_unsubscriptions(exact_subnet);
}
match self.unsubscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any random subnet expiries
while let Async::Ready(Some(subnet)) = self.random_subnets.poll().map_err(|e| {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e));
})?
{
self.handle_random_subnet_expiry(subnet);
}
match self.random_subnets.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => self.handle_random_subnet_expiry(subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any known validator expiries
while let Async::Ready(Some(_validator_index)) = self.known_validators.poll().map_err(|e| {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e));
})?
{
let _ = self.handle_known_validator_expiry();
}
match self.known_validators.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(_validator_index))) => {
let _ = self.handle_known_validator_expiry();
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}
// poll to remove entries on expiration, no need to act on expiration events
let _ = self.aggregate_validators_on_subnet.poll().map_err(|e| { error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e)); });
if let Poll::Ready(Some(Err(e))) = self.aggregate_validators_on_subnet.poll_next_unpin(cx) {
error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e));
}
// process any generated events
if let Some(event) = self.events.pop_front() {
return Ok(Async::Ready(Some(event)));
return Poll::Ready(Some(event));
}
Ok(Async::NotReady)
Poll::Pending
}
}