Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

This commit is contained in:
realbigsean
2024-06-20 09:36:56 -04:00
31 changed files with 310 additions and 329 deletions

View File

@@ -60,6 +60,7 @@ tempfile = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-channel = { workspace = true }
logging = { workspace = true }
[features]
libp2p-websocket = []

View File

@@ -374,6 +374,12 @@ where
id: outbound_info.req_id,
})));
}
// Also handle any events that are awaiting to be sent to the behaviour
if !self.events_out.is_empty() {
return Poll::Ready(Some(self.events_out.remove(0)));
}
Poll::Ready(None)
}

View File

@@ -316,6 +316,27 @@ where
self.events.push(error_msg);
}
}
// Replace the pending Requests to the disconnected peer
// with reports of failed requests.
self.events.iter_mut().for_each(|event| match &event {
ToSwarm::NotifyHandler {
peer_id: p,
event: RPCSend::Request(request_id, req),
..
} if *p == peer_id => {
*event = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id: connection_id,
event: HandlerEvent::Err(HandlerErr::Outbound {
id: *request_id,
proto: req.versioned_protocol().protocol(),
error: RPCError::Disconnected,
}),
});
}
_ => {}
});
}
}

View File

@@ -147,7 +147,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
queued_requests.push_back(rate_limited_req);
queued_requests.push_front(rate_limited_req);
// If one fails just wait for the next window that allows sending requests.
return;
}
@@ -205,3 +205,72 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{OutboundRequest, Ping, Protocol};
use crate::service::api_types::RequestId;
use libp2p::PeerId;
use std::time::Duration;
use types::MainnetEthSpec;
/// Test that `next_peer_request_ready` correctly maintains the queue.
#[tokio::test]
async fn test_next_peer_request_ready() {
let log = logging::test_logger();
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every(1, 2),
..Default::default()
});
let mut limiter: SelfRateLimiter<RequestId<u64>, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
let peer_id = PeerId::random();
for i in 1..=5 {
let _ = limiter.allows(
peer_id,
RequestId::Application(i),
OutboundRequest::Ping(Ping { data: i }),
);
}
{
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(4, queue.len());
// Check that requests in the queue are ordered in the sequence 2, 3, 4, 5.
let mut iter = queue.iter();
for i in 2..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
}
assert_eq!(limiter.ready_requests.len(), 0);
}
// Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
tokio::time::sleep(Duration::from_secs(3)).await;
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
{
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
// Check that requests in the queue are ordered in the sequence 3, 4, 5.
let mut iter = queue.iter();
for i in 3..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
}
assert_eq!(limiter.ready_requests.len(), 1);
}
}
}