mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-16 20:39:10 +00:00
Merge branch 'unstable' into validator-manager
This commit is contained in:
@@ -1,10 +0,0 @@
|
||||
[package]
|
||||
name = "fallback"
|
||||
version = "0.1.0"
|
||||
authors = ["blacktemplar <blacktemplar@a1.net>"]
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.10.0"
|
||||
@@ -1,63 +0,0 @@
|
||||
use itertools::{join, zip};
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::future::Future;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Fallback<T> {
|
||||
pub servers: Vec<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum FallbackError<E> {
|
||||
AllErrored(Vec<E>),
|
||||
}
|
||||
|
||||
impl<T> Fallback<T> {
|
||||
pub fn new(servers: Vec<T>) -> Self {
|
||||
Self { servers }
|
||||
}
|
||||
|
||||
/// Return the first successful result along with number of previous errors encountered
|
||||
/// or all the errors encountered if every server fails.
|
||||
pub async fn first_success<'a, F, O, E, R>(
|
||||
&'a self,
|
||||
func: F,
|
||||
) -> Result<(O, usize), FallbackError<E>>
|
||||
where
|
||||
F: Fn(&'a T) -> R,
|
||||
R: Future<Output = Result<O, E>>,
|
||||
{
|
||||
let mut errors = vec![];
|
||||
for server in &self.servers {
|
||||
match func(server).await {
|
||||
Ok(val) => return Ok((val, errors.len())),
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
Err(FallbackError::AllErrored(errors))
|
||||
}
|
||||
|
||||
pub fn map_format_error<'a, E, F, S>(&'a self, f: F, error: &FallbackError<E>) -> String
|
||||
where
|
||||
F: FnMut(&'a T) -> &'a S,
|
||||
S: Display + 'a,
|
||||
E: Debug,
|
||||
{
|
||||
match error {
|
||||
FallbackError::AllErrored(v) => format!(
|
||||
"All fallbacks errored: {}",
|
||||
join(
|
||||
zip(self.servers.iter().map(f), v.iter())
|
||||
.map(|(server, error)| format!("{} => {:?}", server, error)),
|
||||
", "
|
||||
)
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Display> Fallback<T> {
|
||||
pub fn format_error<E: Debug>(&self, error: &FallbackError<E>) -> String {
|
||||
self.map_format_error(|s| s, error)
|
||||
}
|
||||
}
|
||||
@@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!(
|
||||
// NOTE: using --match instead of --exclude for compatibility with old Git
|
||||
"--match=thiswillnevermatchlol"
|
||||
],
|
||||
prefix = "Lighthouse/v3.1.0-",
|
||||
fallback = "Lighthouse/v3.1.0"
|
||||
prefix = "Lighthouse/v3.2.0-",
|
||||
fallback = "Lighthouse/v3.2.0"
|
||||
);
|
||||
|
||||
/// Returns `VERSION`, but with platform information appended to the end.
|
||||
|
||||
@@ -43,6 +43,16 @@ impl JsonMetric {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a default json value given given the metric type.
|
||||
fn get_typed_value_default(&self) -> serde_json::Value {
|
||||
match self.ty {
|
||||
JsonType::Integer => json!(0),
|
||||
JsonType::Boolean => {
|
||||
json!(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The required metrics for the beacon and validator processes.
|
||||
@@ -155,6 +165,16 @@ pub fn gather_metrics(metrics_map: &HashMap<String, JsonMetric>) -> Option<serde
|
||||
let _ = res.insert(metric.json_output_key.to_string(), value);
|
||||
};
|
||||
}
|
||||
// Insert default metrics for all monitoring service metrics that do not
|
||||
// exist as lighthouse metrics.
|
||||
for json_metric in metrics_map.values() {
|
||||
if !res.contains_key(json_metric.json_output_key) {
|
||||
let _ = res.insert(
|
||||
json_metric.json_output_key.to_string(),
|
||||
json_metric.get_typed_value_default(),
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(serde_json::Value::Object(res))
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//!
|
||||
//! This implementation may not be blazingly fast but it should be simple enough to be reliable.
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
@@ -13,9 +13,10 @@ pub enum Error {
|
||||
enum Future<T> {
|
||||
/// The future is ready and the item may be consumed.
|
||||
Ready(T),
|
||||
/// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to
|
||||
/// detect when the channel is disconnected.
|
||||
NotReady(Weak<()>),
|
||||
/// Future is not ready.
|
||||
NotReady,
|
||||
/// The sender has been dropped without sending a message.
|
||||
SenderDropped,
|
||||
}
|
||||
|
||||
struct MutexCondvar<T> {
|
||||
@@ -24,7 +25,7 @@ struct MutexCondvar<T> {
|
||||
}
|
||||
|
||||
/// The sending pair of the `oneshot` channel.
|
||||
pub struct Sender<T>(Arc<MutexCondvar<T>>, Option<Arc<()>>);
|
||||
pub struct Sender<T>(Arc<MutexCondvar<T>>);
|
||||
|
||||
impl<T> Sender<T> {
|
||||
/// Send a message, consuming `self` and delivering the message to *all* receivers.
|
||||
@@ -35,11 +36,15 @@ impl<T> Sender<T> {
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
/// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that
|
||||
/// the sender has been dropped.
|
||||
/// Flag the sender as dropped and notify all receivers.
|
||||
fn drop(&mut self) {
|
||||
self.1 = None;
|
||||
let mut lock = self.0.mutex.lock();
|
||||
if !matches!(*lock, Future::Ready(_)) {
|
||||
*lock = Future::SenderDropped
|
||||
}
|
||||
self.0.condvar.notify_all();
|
||||
// The lock must be held whilst the condvar is notified.
|
||||
drop(lock);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,8 +64,8 @@ impl<T: Clone> Receiver<T> {
|
||||
pub fn try_recv(&self) -> Result<Option<T>, Error> {
|
||||
match &*self.0.mutex.lock() {
|
||||
Future::Ready(item) => Ok(Some(item.clone())),
|
||||
Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None),
|
||||
Future::NotReady(_) => Err(Error::SenderDropped),
|
||||
Future::NotReady => Ok(None),
|
||||
Future::SenderDropped => Err(Error::SenderDropped),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,10 +76,8 @@ impl<T: Clone> Receiver<T> {
|
||||
loop {
|
||||
match &*lock {
|
||||
Future::Ready(item) => return Ok(item.clone()),
|
||||
Future::NotReady(weak) if weak.upgrade().is_some() => {
|
||||
self.0.condvar.wait(&mut lock)
|
||||
}
|
||||
Future::NotReady(_) => return Err(Error::SenderDropped),
|
||||
Future::NotReady => self.0.condvar.wait(&mut lock),
|
||||
Future::SenderDropped => return Err(Error::SenderDropped),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -84,13 +87,12 @@ impl<T: Clone> Receiver<T> {
|
||||
///
|
||||
/// The sender may send *only one* message which will be received by *all* receivers.
|
||||
pub fn oneshot<T: Clone>() -> (Sender<T>, Receiver<T>) {
|
||||
let sender_ref = Arc::new(());
|
||||
let mutex_condvar = Arc::new(MutexCondvar {
|
||||
mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))),
|
||||
mutex: Mutex::new(Future::NotReady),
|
||||
condvar: Condvar::new(),
|
||||
});
|
||||
let receiver = Receiver(mutex_condvar.clone());
|
||||
let sender = Sender(mutex_condvar, Some(sender_ref));
|
||||
let sender = Sender(mutex_condvar);
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user