mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-19 13:58:28 +00:00
Correct parent lookup (#1027)
* Correct parent-lookup with block gossip verification * Further update port conflicts in tests
This commit is contained in:
@@ -78,11 +78,11 @@ pub fn build_config(
|
||||
}
|
||||
|
||||
pub fn build_libp2p_instance(
|
||||
port: u16,
|
||||
boot_nodes: Vec<Enr>,
|
||||
secret_key: Option<String>,
|
||||
log: slog::Logger,
|
||||
) -> LibP2PService<E> {
|
||||
let port = unused_port("tcp").unwrap();
|
||||
let config = build_config(port, boot_nodes, secret_key);
|
||||
// launch libp2p service
|
||||
LibP2PService::new(&config, EnrForkId::default(), log.clone())
|
||||
@@ -101,8 +101,7 @@ pub fn get_enr(node: &LibP2PService<E>) -> Enr {
|
||||
#[allow(dead_code)]
|
||||
pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
||||
let mut nodes: Vec<LibP2PService<E>> = (0..n)
|
||||
.map(|_| unused_port("tcp").unwrap())
|
||||
.map(|p| build_libp2p_instance(p, vec![], None, log.clone()))
|
||||
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
||||
.collect();
|
||||
let multiaddrs: Vec<Multiaddr> = nodes
|
||||
.iter()
|
||||
@@ -125,15 +124,12 @@ pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
||||
// Constructs a pair of nodes with seperate loggers. The sender dials the receiver.
|
||||
// This returns a (sender, receiver) pair.
|
||||
#[allow(dead_code)]
|
||||
pub fn build_node_pair(
|
||||
log: &slog::Logger,
|
||||
start_port: u16,
|
||||
) -> (LibP2PService<E>, LibP2PService<E>) {
|
||||
pub fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PService<E>) {
|
||||
let sender_log = log.new(o!("who" => "sender"));
|
||||
let receiver_log = log.new(o!("who" => "receiver"));
|
||||
|
||||
let mut sender = build_libp2p_instance(start_port, vec![], None, sender_log);
|
||||
let receiver = build_libp2p_instance(start_port + 1, vec![], None, receiver_log);
|
||||
let mut sender = build_libp2p_instance(vec![], None, sender_log);
|
||||
let receiver = build_libp2p_instance(vec![], None, receiver_log);
|
||||
|
||||
let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone();
|
||||
match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr) {
|
||||
@@ -147,8 +143,7 @@ pub fn build_node_pair(
|
||||
#[allow(dead_code)]
|
||||
pub fn build_linear(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
||||
let mut nodes: Vec<LibP2PService<E>> = (0..n)
|
||||
.map(|_| unused_port("tcp").unwrap())
|
||||
.map(|p| build_libp2p_instance(p, vec![], None, log.clone()))
|
||||
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
||||
.collect();
|
||||
let multiaddrs: Vec<Multiaddr> = nodes
|
||||
.iter()
|
||||
|
||||
@@ -25,8 +25,7 @@ fn test_status_rpc() {
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
|
||||
// get sender/receiver
|
||||
let port = common::unused_port("tcp").unwrap();
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log, port);
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log);
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = RPCRequest::Status(StatusMessage {
|
||||
@@ -141,8 +140,7 @@ fn test_blocks_by_range_chunked_rpc() {
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
|
||||
// get sender/receiver
|
||||
let port = common::unused_port("tcp").unwrap();
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log, port);
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log);
|
||||
|
||||
// BlocksByRange Request
|
||||
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
||||
@@ -277,8 +275,7 @@ fn test_blocks_by_range_single_empty_rpc() {
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
|
||||
// get sender/receiver
|
||||
let port = common::unused_port("tcp").unwrap();
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log, port);
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log);
|
||||
|
||||
// BlocksByRange Request
|
||||
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
||||
@@ -414,8 +411,7 @@ fn test_blocks_by_root_chunked_rpc() {
|
||||
let spec = E::default_spec();
|
||||
|
||||
// get sender/receiver
|
||||
let port = common::unused_port("tcp").unwrap();
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log, port);
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log);
|
||||
|
||||
// BlocksByRoot Request
|
||||
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
||||
@@ -543,8 +539,7 @@ fn test_goodbye_rpc() {
|
||||
let log = common::build_log(log_level, enable_logging);
|
||||
|
||||
// get sender/receiver
|
||||
let port = common::unused_port("tcp").unwrap();
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log, port);
|
||||
let (mut sender, mut receiver) = common::build_node_pair(&log);
|
||||
|
||||
// Goodbye Request
|
||||
let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown);
|
||||
|
||||
@@ -60,7 +60,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
executor: &tokio::runtime::TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
|
||||
let message_handler_log = log.new(o!("service"=> "msg_handler"));
|
||||
let message_handler_log = log.new(o!("service"=> "router"));
|
||||
trace!(message_handler_log, "Service starting");
|
||||
|
||||
let (handler_send, handler_recv) = mpsc::unbounded_channel();
|
||||
@@ -262,16 +262,18 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
AttestationType::Unaggregated { should_store: true },
|
||||
);
|
||||
}
|
||||
PubsubMessage::BeaconBlock(block) => match self.processor.should_forward_block(block) {
|
||||
Ok(verified_block) => {
|
||||
self.propagate_message(id, peer_id.clone());
|
||||
self.processor.on_block_gossip(peer_id, verified_block);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(self.log, "Could not verify block for gossip";
|
||||
PubsubMessage::BeaconBlock(block) => {
|
||||
match self.processor.should_forward_block(&peer_id, block) {
|
||||
Ok(verified_block) => {
|
||||
self.propagate_message(id, peer_id.clone());
|
||||
self.processor.on_block_gossip(peer_id, verified_block);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(self.log, "Could not verify block for gossip";
|
||||
"error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
PubsubMessage::VoluntaryExit(_exit) => {
|
||||
// TODO: Apply more sophisticated validation
|
||||
self.propagate_message(id, peer_id.clone());
|
||||
|
||||
@@ -490,9 +490,17 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
/// across the network.
|
||||
pub fn should_forward_block(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
block: Box<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<GossipVerifiedBlock<T>, BlockError> {
|
||||
self.chain.verify_block_for_gossip(*block)
|
||||
let result = self.chain.verify_block_for_gossip(*block.clone());
|
||||
|
||||
if let Err(BlockError::ParentUnknown(_)) = result {
|
||||
// if we don't know the parent, start a parent lookup
|
||||
// TODO: Modify the return to avoid the block clone.
|
||||
self.send_to_sync(SyncMessage::UnknownBlock(peer_id.clone(), block));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Process a gossip message declaring a new block.
|
||||
@@ -534,7 +542,8 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
}
|
||||
BlockProcessingOutcome::ParentUnknown { .. } => {
|
||||
// Inform the sync manager to find parents for this block
|
||||
debug!(self.log, "Block with unknown parent received";
|
||||
// This should not occur. It should be checked by `should_forward_block`
|
||||
error!(self.log, "Block with unknown parent attempted to be processed";
|
||||
"peer_id" => format!("{:?}",peer_id));
|
||||
self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user