Update hashmap hashset to stable futures

This commit is contained in:
Age Manning
2020-04-28 14:10:58 +10:00
parent 84e634b129
commit d432378a3c
4 changed files with 81 additions and 78 deletions

View File

@@ -8,13 +8,17 @@
const DEFAULT_DELAY: u64 = 30;
use futures::prelude::*;
use std::collections::HashMap;
use std::time::Duration;
use tokio_timer::delay_queue::{self, DelayQueue};
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::delay_queue::{self, DelayQueue};
pub struct HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone,
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// The given entries.
entries: HashMap<K, MapEntry<V>>,
@@ -34,7 +38,7 @@ struct MapEntry<V> {
impl<K, V> Default for HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone,
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
fn default() -> Self {
HashMapDelay::new(Duration::from_secs(DEFAULT_DELAY))
@@ -43,7 +47,7 @@ where
impl<K, V> HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone,
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// Creates a new instance of `HashMapDelay`.
pub fn new(default_entry_timeout: Duration) -> Self {
@@ -139,23 +143,22 @@ where
impl<K, V> Stream for HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone,
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
V: Unpin,
{
type Item = (K, V);
type Error = &'static str;
type Item = Result<(K, V), String>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.expirations.poll() {
Ok(Async::Ready(Some(key))) => {
let key = key.into_inner();
match self.entries.remove(&key) {
Some(entry) => Ok(Async::Ready(Some((key, entry.value)))),
None => Err("Value no longer exists in expirations"),
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(key))) => match self.entries.remove(key.get_ref()) {
Some(entry) => Poll::Ready(Some(Ok((key.into_inner(), entry.value)))),
None => Poll::Ready(Some(Err("Value no longer exists in expirations".into()))),
},
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(format!("delay queue error: {:?}", e))))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err("Error polling HashMapDelay"),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}