Merge branch 'unstable' of https://github.com/sigp/lighthouse into check-da-cache-in-rpc-response

This commit is contained in:
realbigsean
2024-02-14 21:55:45 -05:00
167 changed files with 19920 additions and 1494 deletions

View File

@@ -35,7 +35,7 @@ lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
logging = { workspace = true }
task_executor = { workspace = true }
igd = "0.12.1"
igd-next = "0.14.3"
itertools = { workspace = true }
num_cpus = { workspace = true }
lru_cache = { workspace = true }

View File

@@ -66,7 +66,7 @@ pub fn construct_upnp_mappings<T: EthSpec>(
log: slog::Logger,
) {
info!(log, "UPnP Attempting to initialise routes");
match igd::search_gateway(Default::default()) {
match igd_next::search_gateway(Default::default()) {
Err(e) => info!(log, "UPnP not available"; "error" => %e),
Ok(gateway) => {
// Need to find the local listening address matched with the router subnet
@@ -109,12 +109,12 @@ pub fn construct_upnp_mappings<T: EthSpec>(
// router, they should ideally try to set different port numbers.
mappings.tcp_port = add_port_mapping(
&gateway,
igd::PortMappingProtocol::TCP,
igd_next::PortMappingProtocol::TCP,
libp2p_socket,
"tcp",
&log,
).map(|_| {
let external_socket = external_ip.as_ref().map(|ip| SocketAddr::new((*ip).into(), config.tcp_port)).map_err(|_| ());
let external_socket = external_ip.as_ref().map(|ip| SocketAddr::new(*ip, config.tcp_port)).map_err(|_| ());
info!(log, "UPnP TCP route established"; "external_socket" => format!("{}:{}", external_socket.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), config.tcp_port));
config.tcp_port
}).ok();
@@ -123,7 +123,7 @@ pub fn construct_upnp_mappings<T: EthSpec>(
let udp_socket = SocketAddrV4::new(address, udp_port);
add_port_mapping(
&gateway,
igd::PortMappingProtocol::UDP,
igd_next::PortMappingProtocol::UDP,
udp_socket,
"udp",
&log,
@@ -156,8 +156,8 @@ pub fn construct_upnp_mappings<T: EthSpec>(
/// Sets up a port mapping for a protocol returning the mapped port if successful.
fn add_port_mapping(
gateway: &igd::Gateway,
protocol: igd::PortMappingProtocol,
gateway: &igd_next::Gateway,
protocol: igd_next::PortMappingProtocol,
socket: SocketAddrV4,
protocol_string: &'static str,
log: &slog::Logger,
@@ -168,10 +168,16 @@ fn add_port_mapping(
// router, they should ideally try to set different port numbers.
let mapping_string = &format!("lighthouse-{}", protocol_string);
for _ in 0..2 {
match gateway.add_port(protocol, socket.port(), socket, 0, mapping_string) {
match gateway.add_port(
protocol,
socket.port(),
SocketAddr::V4(socket),
0,
mapping_string,
) {
Err(e) => {
match e {
igd::AddPortError::PortInUse => {
igd_next::AddPortError::PortInUse => {
// Try and remove and re-create
debug!(log, "UPnP port in use, attempting to remap"; "protocol" => protocol_string, "port" => socket.port());
match gateway.remove_port(protocol, socket.port()) {
@@ -202,10 +208,10 @@ fn add_port_mapping(
pub fn remove_mappings(mappings: &EstablishedUPnPMappings, log: &slog::Logger) {
if mappings.is_some() {
debug!(log, "Removing UPnP port mappings");
match igd::search_gateway(Default::default()) {
match igd_next::search_gateway(Default::default()) {
Ok(gateway) => {
if let Some(tcp_port) = mappings.tcp_port {
match gateway.remove_port(igd::PortMappingProtocol::TCP, tcp_port) {
match gateway.remove_port(igd_next::PortMappingProtocol::TCP, tcp_port) {
Ok(()) => debug!(log, "UPnP Removed TCP port mapping"; "port" => tcp_port),
Err(e) => {
debug!(log, "UPnP Failed to remove TCP port mapping"; "port" => tcp_port, "error" => %e)
@@ -213,7 +219,7 @@ pub fn remove_mappings(mappings: &EstablishedUPnPMappings, log: &slog::Logger) {
}
}
for udp_port in mappings.udp_ports() {
match gateway.remove_port(igd::PortMappingProtocol::UDP, *udp_port) {
match gateway.remove_port(igd_next::PortMappingProtocol::UDP, *udp_port) {
Ok(()) => debug!(log, "UPnP Removed UDP port mapping"; "port" => udp_port),
Err(e) => {
debug!(log, "UPnP Failed to remove UDP port mapping"; "port" => udp_port, "error" => %e)

View File

@@ -1657,7 +1657,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
PeerAction::HighToleranceError,
"light_client_gossip_error",
);
}
@@ -1675,15 +1675,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"light_client_gossip_error",
);
}
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
self.log,
"Light client finality update already seen";
"peer" => %peer_id,
"error" => ?e,
),
LightClientFinalityUpdateError::BeaconChainError(_)
| LightClientFinalityUpdateError::LightClientUpdateError(_)
| LightClientFinalityUpdateError::SigSlotStartIsNone
LightClientFinalityUpdateError::SigSlotStartIsNone
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
self.log,
"Light client error constructing finality update";
@@ -1801,19 +1793,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"light_client_gossip_error",
);
}
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => {
metrics::register_optimistic_update_error(&e);
debug!(
self.log,
"Light client optimistic update already seen";
"peer" => %peer_id,
"error" => ?e,
)
}
LightClientOptimisticUpdateError::BeaconChainError(_)
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
| LightClientOptimisticUpdateError::SigSlotStartIsNone
LightClientOptimisticUpdateError::SigSlotStartIsNone
| LightClientOptimisticUpdateError::FailedConstructingUpdate => {
metrics::register_optimistic_update_error(&e);

View File

@@ -589,7 +589,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Create a new work event to process `LightClientBootstrap`s from the RPC network.
pub fn send_lightclient_bootstrap_request(
pub fn send_light_client_bootstrap_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,

View File

@@ -9,10 +9,8 @@ use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
use beacon_chain::{
observed_block_producers::Error as ObserveError,
validator_monitor::{get_block_delay_ms, get_slot_delay_ms},
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError,
ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer,
validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainError,
BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer,
};
use beacon_processor::{
work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage},
@@ -20,10 +18,8 @@ use beacon_processor::{
};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
@@ -142,72 +138,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
return;
};
// Returns `true` if the time now is after the 4s attestation deadline.
let block_is_late = SystemTime::now()
.duration_since(UNIX_EPOCH)
// If we can't read the system time clock then indicate that the
// block is late (and therefore should *not* be requeued). This
// avoids infinite loops.
.map_or(true, |now| {
get_block_delay_ms(now, block.message(), &self.chain.slot_clock)
> self.chain.slot_clock.unagg_attestation_production_delay()
});
// Checks if a block from this proposer is already known.
let block_equivocates = || {
match self.chain.observed_slashable.read().is_slashable(
block.slot(),
block.message().proposer_index(),
block.canonical_root(),
) {
Ok(is_slashable) => is_slashable,
//Both of these blocks will be rejected, so reject them now rather
// than re-queuing them.
Err(ObserveError::FinalizedBlock { .. })
| Err(ObserveError::ValidatorIndexTooHigh { .. }) => false,
}
};
// If we've already seen a block from this proposer *and* the block
// arrived before the attestation deadline, requeue it to ensure it is
// imported late enough that it won't receive a proposer boost.
//
// Don't requeue blocks if they're already known to fork choice, just
// push them through to block processing so they can be handled through
// the normal channels.
if !block_is_late && block_equivocates() {
debug!(
self.log,
"Delaying processing of duplicate RPC block";
"block_root" => ?block_root,
"proposer" => block.message().proposer_index(),
"slot" => block.slot()
);
// Send message to work reprocess queue to retry the block
let (process_fn, ignore_fn) = self.clone().generate_rpc_beacon_block_fns(
block_root,
block,
seen_timestamp,
process_type,
);
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
beacon_block_root: block_root,
process_fn,
ignore_fn,
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(
self.log,
"Failed to inform block import";
"source" => "rpc",
"block_root" => %block_root
);
}
return;
}
let slot = block.slot();
let parent_root = block.message().parent_root();
let commitments_formatted = block.as_block().commitments_formatted();

View File

@@ -218,7 +218,7 @@ impl<T: BeaconChainTypes> Router<T> {
),
Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_lightclient_bootstrap_request(peer_id, request_id, request),
.send_light_client_bootstrap_request(peer_id, request_id, request),
),
}
}

View File

@@ -20,7 +20,7 @@ mod tests {
fn get_topic_params(
&self,
topic: GossipTopic,
) -> Option<&lighthouse_network::libp2p::gossipsub::TopicScoreParams> {
) -> Option<&lighthouse_network::gossipsub::TopicScoreParams> {
self.libp2p.get_topic_params(topic)
}
}

View File

@@ -253,7 +253,7 @@ mod attestation_service {
&attestation_service.beacon_chain.spec,
)
.unwrap();
let expected = vec![
let expected = [
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id)),
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id)),
];

View File

@@ -332,7 +332,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
self.retry_batch_download(network, id)?;
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)