mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 05:44:30 +00:00
IDONTWANT message optimisation to cutoff for smaller messages (#6456)
* idontwant message opitmising * requested changes and linter appeasing * added the config cli flag * Merge branch 'unstable' into fix/idontwant-optimise * cli docs generated * const declaration * Hide extra technical cli flag * passing ci * Merge branch 'unstable' into fix/idontwant-optimise
This commit is contained in:
@@ -1812,9 +1812,6 @@ where
|
|||||||
// Calculate the message id on the transformed data.
|
// Calculate the message id on the transformed data.
|
||||||
let msg_id = self.config.message_id(&message);
|
let msg_id = self.config.message_id(&message);
|
||||||
|
|
||||||
// Broadcast IDONTWANT messages.
|
|
||||||
self.send_idontwant(&raw_message, &msg_id, propagation_source);
|
|
||||||
|
|
||||||
// Check the validity of the message
|
// Check the validity of the message
|
||||||
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
|
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
|
||||||
// and instead continually penalize peers that repeatedly send this message.
|
// and instead continually penalize peers that repeatedly send this message.
|
||||||
@@ -1830,6 +1827,12 @@ where
|
|||||||
self.mcache.observe_duplicate(&msg_id, propagation_source);
|
self.mcache.observe_duplicate(&msg_id, propagation_source);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast IDONTWANT messages
|
||||||
|
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
|
||||||
|
self.send_idontwant(&raw_message, &msg_id, propagation_source);
|
||||||
|
}
|
||||||
|
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
message=%msg_id,
|
message=%msg_id,
|
||||||
"Put message in duplicate_cache and resolve promises"
|
"Put message in duplicate_cache and resolve promises"
|
||||||
|
|||||||
@@ -5266,13 +5266,14 @@ fn sends_idontwant() {
|
|||||||
|
|
||||||
let message = RawMessage {
|
let message = RawMessage {
|
||||||
source: Some(peers[1]),
|
source: Some(peers[1]),
|
||||||
data: vec![12],
|
data: vec![12u8; 1024],
|
||||||
sequence_number: Some(0),
|
sequence_number: Some(0),
|
||||||
topic: topic_hashes[0].clone(),
|
topic: topic_hashes[0].clone(),
|
||||||
signature: None,
|
signature: None,
|
||||||
key: None,
|
key: None,
|
||||||
validated: true,
|
validated: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
gs.handle_received_message(message.clone(), &local_id);
|
gs.handle_received_message(message.clone(), &local_id);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
receivers
|
receivers
|
||||||
@@ -5292,6 +5293,48 @@ fn sends_idontwant() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn doesnt_sends_idontwant_for_lower_message_size() {
|
||||||
|
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
|
||||||
|
.peer_no(5)
|
||||||
|
.topics(vec![String::from("topic1")])
|
||||||
|
.to_subscribe(true)
|
||||||
|
.gs_config(Config::default())
|
||||||
|
.explicit(1)
|
||||||
|
.peer_kind(PeerKind::Gossipsubv1_2)
|
||||||
|
.create_network();
|
||||||
|
|
||||||
|
let local_id = PeerId::random();
|
||||||
|
|
||||||
|
let message = RawMessage {
|
||||||
|
source: Some(peers[1]),
|
||||||
|
data: vec![12],
|
||||||
|
sequence_number: Some(0),
|
||||||
|
topic: topic_hashes[0].clone(),
|
||||||
|
signature: None,
|
||||||
|
key: None,
|
||||||
|
validated: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
gs.handle_received_message(message.clone(), &local_id);
|
||||||
|
assert_eq!(
|
||||||
|
receivers
|
||||||
|
.into_iter()
|
||||||
|
.fold(0, |mut idontwants, (peer_id, c)| {
|
||||||
|
let non_priority = c.non_priority.into_inner();
|
||||||
|
while !non_priority.is_empty() {
|
||||||
|
if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() {
|
||||||
|
assert_ne!(peer_id, peers[1]);
|
||||||
|
idontwants += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idontwants
|
||||||
|
}),
|
||||||
|
0,
|
||||||
|
"IDONTWANT was sent"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// Test that a node doesn't send IDONTWANT messages to the mesh peers
|
/// Test that a node doesn't send IDONTWANT messages to the mesh peers
|
||||||
/// that don't run Gossipsub v1.2.
|
/// that don't run Gossipsub v1.2.
|
||||||
#[test]
|
#[test]
|
||||||
@@ -5316,6 +5359,7 @@ fn doesnt_send_idontwant() {
|
|||||||
key: None,
|
key: None,
|
||||||
validated: true,
|
validated: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
gs.handle_received_message(message.clone(), &local_id);
|
gs.handle_received_message(message.clone(), &local_id);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
receivers
|
receivers
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ pub struct Config {
|
|||||||
connection_handler_queue_len: usize,
|
connection_handler_queue_len: usize,
|
||||||
connection_handler_publish_duration: Duration,
|
connection_handler_publish_duration: Duration,
|
||||||
connection_handler_forward_duration: Duration,
|
connection_handler_forward_duration: Duration,
|
||||||
|
idontwant_message_size_threshold: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
@@ -370,6 +371,16 @@ impl Config {
|
|||||||
pub fn forward_queue_duration(&self) -> Duration {
|
pub fn forward_queue_duration(&self) -> Duration {
|
||||||
self.connection_handler_forward_duration
|
self.connection_handler_forward_duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The message size threshold for which IDONTWANT messages are sent.
|
||||||
|
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
|
||||||
|
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
|
||||||
|
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
|
||||||
|
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
|
||||||
|
// default is 1kB
|
||||||
|
pub fn idontwant_message_size_threshold(&self) -> usize {
|
||||||
|
self.idontwant_message_size_threshold
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@@ -440,6 +451,7 @@ impl Default for ConfigBuilder {
|
|||||||
connection_handler_queue_len: 5000,
|
connection_handler_queue_len: 5000,
|
||||||
connection_handler_publish_duration: Duration::from_secs(5),
|
connection_handler_publish_duration: Duration::from_secs(5),
|
||||||
connection_handler_forward_duration: Duration::from_millis(1000),
|
connection_handler_forward_duration: Duration::from_millis(1000),
|
||||||
|
idontwant_message_size_threshold: 1000,
|
||||||
},
|
},
|
||||||
invalid_protocol: false,
|
invalid_protocol: false,
|
||||||
}
|
}
|
||||||
@@ -825,6 +837,17 @@ impl ConfigBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The message size threshold for which IDONTWANT messages are sent.
|
||||||
|
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
|
||||||
|
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
|
||||||
|
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
|
||||||
|
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
|
||||||
|
// default is 1kB
|
||||||
|
pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self {
|
||||||
|
self.config.idontwant_message_size_threshold = size;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Constructs a [`Config`] from the given configuration and validates the settings.
|
/// Constructs a [`Config`] from the given configuration and validates the settings.
|
||||||
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
|
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
|
||||||
// check all constraints on config
|
// check all constraints on config
|
||||||
@@ -895,6 +918,10 @@ impl std::fmt::Debug for Config {
|
|||||||
"published_message_ids_cache_time",
|
"published_message_ids_cache_time",
|
||||||
&self.published_message_ids_cache_time,
|
&self.published_message_ids_cache_time,
|
||||||
);
|
);
|
||||||
|
let _ = builder.field(
|
||||||
|
"idontwant_message_size_threhold",
|
||||||
|
&self.idontwant_message_size_threshold,
|
||||||
|
);
|
||||||
builder.finish()
|
builder.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
|
|||||||
pub const DEFAULT_TCP_PORT: u16 = 9000u16;
|
pub const DEFAULT_TCP_PORT: u16 = 9000u16;
|
||||||
pub const DEFAULT_DISC_PORT: u16 = 9000u16;
|
pub const DEFAULT_DISC_PORT: u16 = 9000u16;
|
||||||
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;
|
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;
|
||||||
|
pub const DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD: usize = 1000usize;
|
||||||
|
|
||||||
/// The maximum size of gossip messages.
|
/// The maximum size of gossip messages.
|
||||||
pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize {
|
pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize {
|
||||||
@@ -141,6 +142,10 @@ pub struct Config {
|
|||||||
|
|
||||||
/// Configuration for the inbound rate limiter (requests received by this node).
|
/// Configuration for the inbound rate limiter (requests received by this node).
|
||||||
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
|
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
|
||||||
|
|
||||||
|
/// Configuration for the minimum message size for which IDONTWANT messages are send in the mesh.
|
||||||
|
/// Lower the value reduces the optimization effect of the IDONTWANT messages.
|
||||||
|
pub idontwant_message_size_threshold: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
@@ -352,6 +357,7 @@ impl Default for Config {
|
|||||||
outbound_rate_limiter_config: None,
|
outbound_rate_limiter_config: None,
|
||||||
invalid_block_storage: None,
|
invalid_block_storage: None,
|
||||||
inbound_rate_limiter_config: None,
|
inbound_rate_limiter_config: None,
|
||||||
|
idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -433,6 +439,7 @@ pub fn gossipsub_config(
|
|||||||
gossipsub_config_params: GossipsubConfigParams,
|
gossipsub_config_params: GossipsubConfigParams,
|
||||||
seconds_per_slot: u64,
|
seconds_per_slot: u64,
|
||||||
slots_per_epoch: u64,
|
slots_per_epoch: u64,
|
||||||
|
idontwant_message_size_threshold: usize,
|
||||||
) -> gossipsub::Config {
|
) -> gossipsub::Config {
|
||||||
fn prefix(
|
fn prefix(
|
||||||
prefix: [u8; 4],
|
prefix: [u8; 4],
|
||||||
@@ -498,6 +505,7 @@ pub fn gossipsub_config(
|
|||||||
.duplicate_cache_time(duplicate_cache_time)
|
.duplicate_cache_time(duplicate_cache_time)
|
||||||
.message_id_fn(gossip_message_id)
|
.message_id_fn(gossip_message_id)
|
||||||
.allow_self_origin(true)
|
.allow_self_origin(true)
|
||||||
|
.idontwant_message_size_threshold(idontwant_message_size_threshold)
|
||||||
.build()
|
.build()
|
||||||
.expect("valid gossipsub configuration")
|
.expect("valid gossipsub configuration")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -237,6 +237,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
gossipsub_config_params,
|
gossipsub_config_params,
|
||||||
ctx.chain_spec.seconds_per_slot,
|
ctx.chain_spec.seconds_per_slot,
|
||||||
E::slots_per_epoch(),
|
E::slots_per_epoch(),
|
||||||
|
config.idontwant_message_size_threshold,
|
||||||
);
|
);
|
||||||
|
|
||||||
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
|
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
|
||||||
|
|||||||
@@ -659,7 +659,15 @@ pub fn cli_app() -> Command {
|
|||||||
.action(ArgAction::Set)
|
.action(ArgAction::Set)
|
||||||
.display_order(0)
|
.display_order(0)
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("idontwant-message-size-threshold")
|
||||||
|
.long("idontwant-message-size-threshold")
|
||||||
|
.help("Specifies the minimum message size for which IDONTWANT messages are sent. \
|
||||||
|
This an optimization strategy to not send IDONTWANT messages for smaller messages.")
|
||||||
|
.action(ArgAction::Set)
|
||||||
|
.hide(true)
|
||||||
|
.display_order(0)
|
||||||
|
)
|
||||||
/*
|
/*
|
||||||
* Monitoring metrics
|
* Monitoring metrics
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1487,6 +1487,20 @@ pub fn set_network_config(
|
|||||||
Some(Default::default())
|
Some(Default::default())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(idontwant_message_size_threshold) =
|
||||||
|
cli_args.get_one::<String>("idontwant-message-size-threshold")
|
||||||
|
{
|
||||||
|
config.idontwant_message_size_threshold = idontwant_message_size_threshold
|
||||||
|
.parse::<usize>()
|
||||||
|
.map_err(|_| {
|
||||||
|
format!(
|
||||||
|
"Invalid idontwant message size threshold value passed: {}",
|
||||||
|
idontwant_message_size_threshold
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user