Add additional libp2p tests (#1867)

## Issue Addressed

N/A

## Proposed Changes

Adds tests for the eth2_libp2p crate.
This commit is contained in:
Pawan Dhananjay
2020-11-19 22:32:09 +00:00
parent 37369c6a56
commit e47739047d
7 changed files with 504 additions and 135 deletions

View File

@@ -556,13 +556,15 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Consume the discovery queue and initiate queries when applicable.
///
/// This also sanitizes the queue removing out-dated queries.
fn process_queue(&mut self) {
/// Returns `true` if any of the queued queries is processed and a discovery
/// query (Subnet or FindPeers) is started.
fn process_queue(&mut self) -> bool {
// Sanitize the queue, removing any out-dated subnet queries
self.queued_queries.retain(|query| !query.expired());
// use this to group subnet queries together for a single discovery request
let mut subnet_queries: Vec<SubnetQuery> = Vec::new();
let mut processed = false;
// Check that we are within our query concurrency limit
while !self.at_capacity() && !self.queued_queries.is_empty() {
// consume and process the query queue
@@ -579,6 +581,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
FIND_NODE_QUERY_CLOSEST_PEERS,
|_| true,
);
processed = true;
} else {
self.queued_queries.push_back(QueryType::FindPeers);
}
@@ -604,6 +607,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"subnets" => format!("{:?}", grouped_queries.iter().map(|q| q.subnet_id).collect::<Vec<_>>()),
);
self.start_subnet_query(grouped_queries);
processed = true;
}
}
None => {} // Queue is empty
@@ -611,6 +615,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
// Update the queue metric
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
processed
}
// Returns a boolean indicating if we are currently processing the maximum number of
@@ -724,111 +729,122 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.active_queries.push(Box::pin(query_future));
}
/// Process the completed QueryResult returned from discv5.
fn process_completed_queries(
&mut self,
query_result: QueryResult,
) -> Option<HashMap<PeerId, Option<Instant>>> {
match query_result.0 {
GroupedQueryType::FindPeers => {
self.find_peer_active = false;
match query_result.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Discovery query yielded no results.");
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<PeerId, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr.peer_id(), None);
});
return Some(results);
}
Err(e) => {
warn!(self.log, "Discovery query failed"; "error" => e.to_string());
}
}
}
GroupedQueryType::Subnet(queries) => {
let subnets_searched_for: Vec<SubnetId> =
queries.iter().map(|query| query.subnet_id).collect();
match query_result.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for));
}
Ok(r) => {
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for));
let mut mapped_results: HashMap<PeerId, Option<Instant>> = HashMap::new();
// cache the found ENR's
for enr in r.iter().cloned() {
self.cached_enrs.put(enr.peer_id(), enr);
}
// Map each subnet query's min_ttl to the set of ENR's returned for that subnet.
queries.iter().for_each(|query| {
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(
query.subnet_id,
query.min_ttl,
query.retries + 1,
);
// Check the specific subnet against the enr
let subnet_predicate =
subnet_predicate::<TSpec>(vec![query.subnet_id], &self.log);
r.iter()
.filter(|enr| subnet_predicate(enr))
.map(|enr| enr.peer_id())
.for_each(|peer_id| {
let other_min_ttl = mapped_results.get_mut(&peer_id);
// map peer IDs to the min_ttl furthest in the future
match (query.min_ttl, other_min_ttl) {
// update the mapping if the min_ttl is greater
(
Some(min_ttl_instant),
Some(Some(other_min_ttl_instant)),
) => {
if min_ttl_instant
.saturating_duration_since(*other_min_ttl_instant)
> DURATION_DIFFERENCE
{
*other_min_ttl_instant = min_ttl_instant;
}
}
// update the mapping if we have a specified min_ttl
(Some(min_ttl), Some(None)) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(Some(min_ttl), None) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(None, None) => {
mapped_results.insert(peer_id, None);
}
(None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl
(None, Some(None)) => {} // No-op because this is a duplicate
}
});
});
if mapped_results.is_empty() {
return None;
} else {
return Some(mapped_results);
}
}
Err(e) => {
warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string());
}
}
}
}
None
}
/// Drives the queries returning any results from completed queries.
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<PeerId, Option<Instant>>> {
while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) {
match query_future.0 {
GroupedQueryType::FindPeers => {
self.find_peer_active = false;
match query_future.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Discovery query yielded no results.");
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<PeerId, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr.peer_id(), None);
});
return Some(results);
}
Err(e) => {
warn!(self.log, "Discovery query failed"; "error" => e.to_string());
}
}
}
GroupedQueryType::Subnet(queries) => {
let subnets_searched_for: Vec<SubnetId> =
queries.iter().map(|query| query.subnet_id).collect();
match query_future.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for));
}
Ok(r) => {
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for));
let mut mapped_results: HashMap<PeerId, Option<Instant>> =
HashMap::new();
// cache the found ENR's
for enr in r.iter().cloned() {
self.cached_enrs.put(enr.peer_id(), enr);
}
// Map each subnet query's min_ttl to the set of ENR's returned for that subnet.
queries.iter().for_each(|query| {
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(
query.subnet_id,
query.min_ttl,
query.retries + 1,
);
// Check the specific subnet against the enr
let subnet_predicate =
subnet_predicate::<TSpec>(vec![query.subnet_id], &self.log);
r.iter()
.filter(|enr| subnet_predicate(enr))
.map(|enr| enr.peer_id())
.for_each(|peer_id| {
let other_min_ttl = mapped_results.get_mut(&peer_id);
// map peer IDs to the min_ttl furthest in the future
match (query.min_ttl, other_min_ttl) {
// update the mapping if the min_ttl is greater
(
Some(min_ttl_instant),
Some(Some(other_min_ttl_instant)),
) => {
if min_ttl_instant.saturating_duration_since(
*other_min_ttl_instant,
) > DURATION_DIFFERENCE
{
*other_min_ttl_instant = min_ttl_instant;
}
}
// update the mapping if we have a specified min_ttl
(Some(min_ttl), Some(None)) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(Some(min_ttl), None) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(None, None) => {
mapped_results.insert(peer_id, None);
}
(None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl
(None, Some(None)) => {} // No-op because this is a duplicate
}
});
});
if mapped_results.is_empty() {
return None;
} else {
return Some(mapped_results);
}
}
Err(e) => {
warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string());
}
}
}
while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) {
let result = self.process_completed_queries(query_result);
if result.is_some() {
return result;
}
}
None
@@ -904,3 +920,184 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::methods::MetaData;
use enr::EnrBuilder;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::MinimalEthSpec;
type E = MinimalEthSpec;
pub fn unused_port() -> u16 {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket");
let local_addr = socket.local_addr().expect("should read udp socket");
local_addr.port()
}
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}
async fn build_discovery() -> Discovery<E> {
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let mut config = NetworkConfig::default();
config.discovery_port = unused_port();
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
let enr: Enr = build_enr::<E>(&enr_key, &config, EnrForkId::default()).unwrap();
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
9000,
9000,
MetaData {
seq_number: 0,
attnets: Default::default(),
},
vec![],
&log,
);
Discovery::new(&keypair, &config, Arc::new(globals), &log)
.await
.unwrap()
}
#[tokio::test]
async fn test_add_subnet_query() {
let mut discovery = build_discovery().await;
let now = Instant::now();
let mut subnet_query = SubnetQuery {
subnet_id: SubnetId::new(1),
min_ttl: Some(now),
retries: 0,
};
discovery.add_subnet_query(
subnet_query.subnet_id,
subnet_query.min_ttl,
subnet_query.retries,
);
assert_eq!(
discovery.queued_queries.back(),
Some(&QueryType::Subnet(subnet_query.clone()))
);
// New query should replace old query
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
discovery.add_subnet_query(subnet_query.subnet_id, subnet_query.min_ttl, 1);
subnet_query.retries += 1;
assert_eq!(discovery.queued_queries.len(), 1);
assert_eq!(
discovery.queued_queries.pop_back(),
Some(QueryType::Subnet(subnet_query.clone()))
);
// Retries > MAX_DISCOVERY_RETRY must return immediately without adding
// anything.
discovery.add_subnet_query(
subnet_query.subnet_id,
subnet_query.min_ttl,
MAX_DISCOVERY_RETRY + 1,
);
assert_eq!(discovery.queued_queries.len(), 0);
}
#[tokio::test]
async fn test_process_queue() {
let mut discovery = build_discovery().await;
// FindPeers query is processed if there is no subnet query
discovery.queued_queries.push_back(QueryType::FindPeers);
assert!(discovery.process_queue());
let now = Instant::now();
let subnet_query = SubnetQuery {
subnet_id: SubnetId::new(1),
min_ttl: Some(now + Duration::from_secs(10)),
retries: 0,
};
// Refresh active queries
discovery.active_queries = Default::default();
// SubnetQuery is processed if it's the only queued query
discovery
.queued_queries
.push_back(QueryType::Subnet(subnet_query.clone()));
assert!(discovery.process_queue());
// SubnetQuery is processed if it's there is also 1 queued discovery query
discovery.queued_queries.push_back(QueryType::FindPeers);
discovery
.queued_queries
.push_back(QueryType::Subnet(subnet_query.clone()));
// Process Subnet query and FindPeers afterwards.
assert!(discovery.process_queue());
}
fn make_enr(subnet_ids: Vec<usize>) -> Enr {
let mut builder = EnrBuilder::new("v4");
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
// set the "attnets" field on our ENR
let mut bitfield = BitVector::<ssz_types::typenum::U64>::new();
for id in subnet_ids {
bitfield.set(id, true).unwrap();
}
builder.add_value(BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());
builder.build(&enr_key).unwrap()
}
#[tokio::test]
async fn test_completed_subnet_queries() {
let mut discovery = build_discovery().await;
let now = Instant::now();
let instant1 = Some(now + Duration::from_secs(10));
let instant2 = Some(now + Duration::from_secs(5));
let query = GroupedQueryType::Subnet(vec![
SubnetQuery {
subnet_id: SubnetId::new(1),
min_ttl: instant1,
retries: 0,
},
SubnetQuery {
subnet_id: SubnetId::new(2),
min_ttl: instant2,
retries: 0,
},
]);
// Create enr which is subscribed to subnets 1 and 2
let enr1 = make_enr(vec![1, 2]);
let enr2 = make_enr(vec![2]);
// Unwanted enr for the given grouped query
let enr3 = make_enr(vec![3]);
let enrs: Vec<Enr> = vec![enr1.clone(), enr2.clone(), enr3.clone()];
let results = discovery
.process_completed_queries(QueryResult(query, Ok(enrs)))
.unwrap();
// enr1 and enr2 are required peers based on the requested subnet ids
assert_eq!(results.len(), 2);
// when a peer belongs to multiple subnet ids, we use the highest ttl.
assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1);
}
}