Remove hashmap delay

This commit is contained in:
Age Manning
2020-05-06 14:13:23 +10:00
parent 9599f6adda
commit c363ffc236
10 changed files with 20 additions and 193 deletions

6
Cargo.lock generated
View File

@@ -1319,7 +1319,7 @@ dependencies = [
"eth2_ssz_types",
"fnv",
"futures 0.3.4",
"hashmap_delay",
"hashset_delay",
"hex 0.4.2",
"lazy_static",
"libp2p",
@@ -1843,7 +1843,7 @@ dependencies = [
]
[[package]]
name = "hashmap_delay"
name = "hashset_delay"
version = "0.2.0"
dependencies = [
"futures 0.3.4",
@@ -3082,7 +3082,7 @@ dependencies = [
"fnv",
"futures 0.1.29",
"genesis",
"hashmap_delay",
"hashset_delay",
"hex 0.3.2",
"parking_lot 0.9.0",
"rand 0.7.3",

View File

@@ -14,7 +14,7 @@ members = [
"eth2/utils/eth2_testnet_config",
"eth2/utils/logging",
"eth2/utils/eth2_hashing",
"eth2/utils/hashmap_delay",
"eth2/utils/hashset_delay",
"eth2/utils/lighthouse_metrics",
"eth2/utils/merkle_proof",
"eth2/utils/int_to_bytes",

View File

@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
hex = "0.4.2"
types = { path = "../../eth2/types" }
hashmap_delay = { path = "../../eth2/utils/hashmap_delay" }
hashset_delay = { path = "../../eth2/utils/hashset_delay" }
eth2_ssz_types = { path = "../../eth2/utils/ssz_types" }
serde = { version = "1.0.106", features = ["derive"] }
serde_derive = "1.0.106"

View File

@@ -6,7 +6,7 @@ use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{NetworkGlobals, PeerId};
use futures::prelude::*;
use futures::Stream;
use hashmap_delay::HashSetDelay;
use hashset_delay::HashSetDelay;
use libp2p::identify::IdentifyInfo;
use slog::{crit, debug, error, warn};
use smallvec::SmallVec;

View File

@@ -13,7 +13,7 @@ tempdir = "0.3"
beacon_chain = { path = "../beacon_chain" }
store = { path = "../store" }
eth2-libp2p = { path = "../eth2-libp2p" }
hashmap_delay = { path = "../../eth2/utils/hashmap_delay" }
hashset_delay = { path = "../../eth2/utils/hashset_delay" }
rest_types = { path = "../../eth2/utils/rest_types" }
types = { path = "../../eth2/types" }
slot_clock = { path = "../../eth2/utils/slot_clock" }

View File

@@ -1,164 +0,0 @@
//! A simple hashmap object coupled with a `delay_queue` which has entries that expire after a
//! fixed time.
//!
//! A `HashMapDelay` implements `Stream` which removes expired items from the map.
/// The default delay for entries, in seconds. This is only used when `insert()` is used to add
/// entries.
const DEFAULT_DELAY: u64 = 30;
use futures::prelude::*;
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::delay_queue::{self, DelayQueue};
pub struct HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// The given entries.
entries: HashMap<K, MapEntry<V>>,
/// A queue holding the timeouts of each entry.
expirations: DelayQueue<K>,
/// The default expiration timeout of an entry.
default_entry_timeout: Duration,
}
/// A wrapping around entries that adds the link to the entry's expiration, via a `delay_queue` key.
struct MapEntry<V> {
/// The expiration key for the entry.
key: delay_queue::Key,
/// The actual entry.
value: V,
}
impl<K, V> Default for HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
fn default() -> Self {
HashMapDelay::new(Duration::from_secs(DEFAULT_DELAY))
}
}
impl<K, V> HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// Creates a new instance of `HashMapDelay`.
pub fn new(default_entry_timeout: Duration) -> Self {
HashMapDelay {
entries: HashMap::new(),
expirations: DelayQueue::new(),
default_entry_timeout,
}
}
/// Insert an entry into the mapping. Entries will expire after the `default_entry_timeout`.
pub fn insert(&mut self, key: K, value: V) {
self.insert_at(key, value, self.default_entry_timeout);
}
/// Inserts an entry that will expire at a given instant.
pub fn insert_at(&mut self, key: K, value: V, entry_duration: Duration) {
let delay_key = self.expirations.insert(key.clone(), entry_duration);
let entry = MapEntry {
key: delay_key,
value,
};
self.entries.insert(key, entry);
}
/// Gets a reference to an entry if it exists.
///
/// Returns None if the entry does not exist.
pub fn get(&self, key: &K) -> Option<&V> {
self.entries.get(key).map(|entry| &entry.value)
}
/// Gets a mutable reference to an entry if it exists.
///
/// Returns None if the entry does not exist.
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
self.entries.get_mut(key).map(|entry| &mut entry.value)
}
/// Returns true if the key exists, false otherwise.
pub fn contains_key(&self, key: &K) -> bool {
self.entries.contains_key(key)
}
/// Returns the length of the mapping.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Updates the timeout for a given key. Returns true if the key existed, false otherwise.
///
/// Panics if the duration is too far in the future.
pub fn update_timeout(&mut self, key: &K, timeout: Duration) -> bool {
if let Some(entry) = self.entries.get(key) {
self.expirations.reset(&entry.key, timeout);
true
} else {
false
}
}
/// Removes a key from the map returning the value associated with the key that was in the map.
///
/// Return None if the key was not in the map.
pub fn remove(&mut self, key: &K) -> Option<V> {
if let Some(entry) = self.entries.remove(key) {
self.expirations.remove(&entry.key);
return Some(entry.value);
}
return None;
}
/// Retains only the elements specified by the predicate.
///
/// In other words, remove all pairs `(k, v)` such that `f(&k,&mut v)` returns false.
pub fn retain<F: FnMut(&K, &mut V) -> bool>(&mut self, mut f: F) {
let expiration = &mut self.expirations;
self.entries.retain(|key, entry| {
let result = f(key, &mut entry.value);
if !result {
expiration.remove(&entry.key);
}
result
})
}
/// Removes all entries from the map.
pub fn clear(&mut self) {
self.entries.clear();
self.expirations.clear();
}
}
impl<K, V> Stream for HashMapDelay<K, V>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
V: Unpin,
{
type Item = Result<(K, V), String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(key))) => match self.entries.remove(key.get_ref()) {
Some(entry) => Poll::Ready(Some(Ok((key.into_inner(), entry.value)))),
None => Poll::Ready(Some(Err("Value no longer exists in expirations".into()))),
},
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(format!("delay queue error: {:?}", e))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -1,21 +0,0 @@
//! This crate provides two objects:
//! - `HashMapDelay`
//! - `HashSetDelay`
//!
//! # HashMapDelay
//!
//! This provides a `HashMap` coupled with a `DelayQueue`. Objects that are inserted into
//! the map are inserted with an expiry. `Stream` is implemented on the `HashMapDelay`
//! which return objects that have expired. These objects are removed from the mapping.
//!
//! # HashSetDelay
//!
//! This is similar to a `HashMapDelay` except the mapping maps to the expiry time. This
//! allows users to add objects and check their expiry deadlines before the `Stream`
//! consumes them.
mod hashmap_delay;
mod hashset_delay;
pub use crate::hashmap_delay::HashMapDelay;
pub use crate::hashset_delay::HashSetDelay;

View File

@@ -1,5 +1,5 @@
[package]
name = "hashmap_delay"
name = "hashset_delay"
version = "0.2.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"

View File

@@ -0,0 +1,12 @@
//! This crate provides a single type (its counter-part HashMapDelay has been removed as it
//! currently is not in use in lighthouse):
//! - `HashSetDelay`
//!
//! # HashSetDelay
//!
//! This is similar to a `HashMapDelay` except the mapping maps to the expiry time. This
//! allows users to add objects and check their expiry deadlines before the `Stream`
//! consumes them.
mod hashset_delay;
pub use crate::hashset_delay::HashSetDelay;