Skip to content

Commit 8426241

Browse files
authored
test(iroh): Make endpoint_relay_connect_loop not flaky (#3402)
## Description `endpoint_relay_connect_loop` was flaky before. This *should* fix this issue. It was last marked flaky (but for windows only) in #3354. What seems to have happened: - The test spawns a relay server - The test spawns a server iroh Endpoint - The server Endpoint does a net report with QAD probes - For some reason the spawned relay server is very slow in responding to QAD requests (>3s) - The QAD probes time out, the server Endpoint ends up without a home relay - The test spawns a client iroh Endpoint - The client Endpoint tries to connect for 30s - The server Endpoint doesn't do another net report for 30s, though, so never ends up being reachable - The client Endpoint times out. To work around this, I'm starting the server endpoint and waiting for it to have a relay address. IMO this is reasonable to do in tests. I've also made the tests use `Connection::close` and `Connection::closed` properly and removed `SendStream::stopped` and `RecvStream::read_to_end(0)` calls. ## Notes There's some other drive-by changes. Sorry about that, but IMO they're kinda too small for their own PRs: - `net_report` thought that `Watchable::set` would return `Err` when there's no more watchers listening, but that's incorrect: It returns `Err` from `set` when the value set is the same as the currently stored value. - I've also made some small cosmetic changes to `net_report` - I've removed a 1600 bytes allocation from the hot path of receiving relay items in `ActiveRelayActor`. ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review.
1 parent e2cfde7 commit 8426241

File tree

3 files changed

+129
-138
lines changed

3 files changed

+129
-138
lines changed

iroh/src/endpoint.rs

Lines changed: 84 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -2298,43 +2298,44 @@ mod tests {
22982298
let server_secret_key = SecretKey::generate(rand::thread_rng());
22992299
let server_peer_id = server_secret_key.public();
23002300

2301-
let server = {
2302-
let relay_map = relay_map.clone();
2303-
tokio::spawn(
2304-
async move {
2305-
let ep = Endpoint::builder()
2306-
.secret_key(server_secret_key)
2307-
.alpns(vec![TEST_ALPN.to_vec()])
2308-
.relay_mode(RelayMode::Custom(relay_map))
2309-
.insecure_skip_relay_cert_verify(true)
2310-
.bind()
2311-
.await?;
2312-
info!("accepting connection");
2313-
let incoming = ep.accept().await.e()?;
2314-
let conn = incoming.await.e()?;
2315-
let mut stream = conn.accept_uni().await.e()?;
2316-
let mut buf = [0u8; 5];
2317-
stream.read_exact(&mut buf).await.e()?;
2318-
info!("Accepted 1 stream, received {buf:?}. Closing now.");
2319-
// close the connection
2320-
conn.close(7u8.into(), b"bye");
2321-
2322-
let res = conn.accept_uni().await;
2323-
assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
2324-
2325-
let res = stream.read_to_end(10).await;
2326-
assert_eq!(
2327-
res.unwrap_err(),
2328-
quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
2329-
quinn::ConnectionError::LocallyClosed
2330-
))
2331-
);
2332-
info!("server test completed");
2333-
Ok::<_, Error>(())
2334-
}
2335-
.instrument(info_span!("test-server")),
2336-
)
2337-
};
2301+
// Wait for the endpoint to be started to make sure it's up before clients try to connect
2302+
let ep = Endpoint::builder()
2303+
.secret_key(server_secret_key)
2304+
.alpns(vec![TEST_ALPN.to_vec()])
2305+
.relay_mode(RelayMode::Custom(relay_map.clone()))
2306+
.insecure_skip_relay_cert_verify(true)
2307+
.bind()
2308+
.await?;
2309+
// Wait for the endpoint to be reachable via relay
2310+
ep.home_relay().initialized().await?;
2311+
2312+
let server = tokio::spawn(
2313+
async move {
2314+
info!("accepting connection");
2315+
let incoming = ep.accept().await.e()?;
2316+
let conn = incoming.await.e()?;
2317+
let mut stream = conn.accept_uni().await.e()?;
2318+
let mut buf = [0u8; 5];
2319+
stream.read_exact(&mut buf).await.e()?;
2320+
info!("Accepted 1 stream, received {buf:?}. Closing now.");
2321+
// close the connection
2322+
conn.close(7u8.into(), b"bye");
2323+
2324+
let res = conn.accept_uni().await;
2325+
assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
2326+
2327+
let res = stream.read_to_end(10).await;
2328+
assert_eq!(
2329+
res.unwrap_err(),
2330+
quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
2331+
quinn::ConnectionError::LocallyClosed
2332+
))
2333+
);
2334+
info!("server test completed");
2335+
Ok::<_, Error>(())
2336+
}
2337+
.instrument(info_span!("test-server")),
2338+
);
23382339

23392340
let client = tokio::spawn(
23402341
async move {
@@ -2439,11 +2440,10 @@ mod tests {
24392440
Ok(())
24402441
}
24412442

2442-
#[cfg_attr(windows, ignore = "flaky")]
24432443
#[tokio::test]
24442444
#[traced_test]
24452445
async fn endpoint_relay_connect_loop() -> Result {
2446-
let start = Instant::now();
2446+
let test_start = Instant::now();
24472447
let n_clients = 5;
24482448
let n_chunks_per_client = 2;
24492449
let chunk_size = 10;
@@ -2452,65 +2452,67 @@ mod tests {
24522452
let server_secret_key = SecretKey::generate(&mut rng);
24532453
let server_node_id = server_secret_key.public();
24542454

2455+
// Make sure the server is bound before having clients connect to it:
2456+
let ep = Endpoint::builder()
2457+
.insecure_skip_relay_cert_verify(true)
2458+
.secret_key(server_secret_key)
2459+
.alpns(vec![TEST_ALPN.to_vec()])
2460+
.relay_mode(RelayMode::Custom(relay_map.clone()))
2461+
.bind()
2462+
.await?;
2463+
// Also make sure the server has a working relay connection
2464+
ep.home_relay().initialized().await?;
2465+
2466+
info!(time = ?test_start.elapsed(), "test setup done");
2467+
24552468
// The server accepts the connections of the clients sequentially.
2456-
let server = {
2457-
let relay_map = relay_map.clone();
2458-
tokio::spawn(
2459-
async move {
2460-
let ep = Endpoint::builder()
2461-
.insecure_skip_relay_cert_verify(true)
2462-
.secret_key(server_secret_key)
2463-
.alpns(vec![TEST_ALPN.to_vec()])
2464-
.relay_mode(RelayMode::Custom(relay_map))
2465-
.bind()
2466-
.await?;
2467-
let eps = ep.bound_sockets();
2468-
2469-
info!(me = %ep.node_id().fmt_short(), eps = ?eps, "server listening on");
2470-
for i in 0..n_clients {
2471-
let round_start = Instant::now();
2472-
info!("[server] round {i}");
2473-
let incoming = ep.accept().await.e()?;
2474-
let conn = incoming.await.e()?;
2475-
let node_id = conn.remote_node_id()?;
2476-
info!(%i, peer = %node_id.fmt_short(), "accepted connection");
2477-
let (mut send, mut recv) = conn.accept_bi().await.e()?;
2478-
let mut buf = vec![0u8; chunk_size];
2479-
for _i in 0..n_chunks_per_client {
2480-
recv.read_exact(&mut buf).await.e()?;
2481-
send.write_all(&buf).await.e()?;
2482-
}
2483-
send.finish().e()?;
2484-
send.stopped().await.e()?;
2485-
recv.read_to_end(0).await.e()?;
2486-
info!(%i, peer = %node_id.fmt_short(), "finished");
2487-
info!("[server] round {i} done in {:?}", round_start.elapsed());
2469+
let server = tokio::spawn(
2470+
async move {
2471+
let eps = ep.bound_sockets();
2472+
2473+
info!(me = %ep.node_id().fmt_short(), eps = ?eps, "server listening on");
2474+
for i in 0..n_clients {
2475+
let round_start = Instant::now();
2476+
info!("[server] round {i}");
2477+
let incoming = ep.accept().await.e()?;
2478+
let conn = incoming.await.e()?;
2479+
let node_id = conn.remote_node_id()?;
2480+
info!(%i, peer = %node_id.fmt_short(), "accepted connection");
2481+
let (mut send, mut recv) = conn.accept_bi().await.e()?;
2482+
let mut buf = vec![0u8; chunk_size];
2483+
for _i in 0..n_chunks_per_client {
2484+
recv.read_exact(&mut buf).await.e()?;
2485+
send.write_all(&buf).await.e()?;
24882486
}
2489-
Ok::<_, Error>(())
2487+
send.finish().e()?;
2488+
conn.closed().await; // we're the last to send data, so we wait for the other side to close
2489+
info!(%i, peer = %node_id.fmt_short(), "finished");
2490+
info!("[server] round {i} done in {:?}", round_start.elapsed());
24902491
}
2491-
.instrument(error_span!("server")),
2492-
)
2493-
};
2492+
Ok::<_, Error>(())
2493+
}
2494+
.instrument(error_span!("server")),
2495+
);
2496+
2497+
let start = Instant::now();
24942498

24952499
for i in 0..n_clients {
24962500
let round_start = Instant::now();
2497-
info!("[client] round {}", i);
2498-
let relay_map = relay_map.clone();
2501+
info!("[client] round {i}");
24992502
let client_secret_key = SecretKey::generate(&mut rng);
2500-
let relay_url = relay_url.clone();
25012503
async {
25022504
info!("client binding");
25032505
let ep = Endpoint::builder()
25042506
.alpns(vec![TEST_ALPN.to_vec()])
25052507
.insecure_skip_relay_cert_verify(true)
2506-
.relay_mode(RelayMode::Custom(relay_map))
2508+
.relay_mode(RelayMode::Custom(relay_map.clone()))
25072509
.secret_key(client_secret_key)
25082510
.bind()
25092511
.await?;
25102512
let eps = ep.bound_sockets();
25112513

25122514
info!(me = %ep.node_id().fmt_short(), eps=?eps, "client bound");
2513-
let node_addr = NodeAddr::new(server_node_id).with_relay_url(relay_url);
2515+
let node_addr = NodeAddr::new(server_node_id).with_relay_url(relay_url.clone());
25142516
info!(to = ?node_addr, "client connecting");
25152517
let conn = ep.connect(node_addr, TEST_ALPN).await.e()?;
25162518
info!("client connected");
@@ -2522,9 +2524,8 @@ mod tests {
25222524
recv.read_exact(&mut buf).await.e()?;
25232525
assert_eq!(buf, vec![i; chunk_size]);
25242526
}
2525-
send.finish().e()?;
2526-
send.stopped().await.e()?;
2527-
recv.read_to_end(0).await.e()?;
2527+
// we're the last to receive data, so we close
2528+
conn.close(0u32.into(), b"bye!");
25282529
info!("client finished");
25292530
ep.close().await;
25302531
info!("client closed");

iroh/src/magicsock/transports/relay/actor.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -615,15 +615,9 @@ impl ActiveRelayActor {
615615
break Ok(());
616616
};
617617
self.reset_inactive_timeout();
618-
// TODO: This allocation is *very* unfortunate. But so is the
619-
// allocation *inside* of PacketizeIter...
620-
let batch = std::mem::replace(
621-
&mut send_datagrams_buf,
622-
Vec::with_capacity(SEND_DATAGRAM_BATCH_SIZE),
623-
);
624618
// TODO(frando): can we avoid the clone here?
625619
let metrics = self.metrics.clone();
626-
let packet_iter = batch.into_iter().map(|item| {
620+
let packet_iter = send_datagrams_buf.drain(..).map(|item| {
627621
metrics.send_relay.inc_by(item.datagrams.contents.len() as _);
628622
Ok(ClientToRelayMsg::Datagrams {
629623
dst_node_id: item.remote_node,

iroh/src/net_report.rs

Lines changed: 44 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ impl Client {
450450
PROBES_TIMEOUT,
451451
run_probe_v4(ip_mapped_addrs, relay_node, quic_client, dns_resolver),
452452
))
453-
.instrument(info_span!("QAD IPv6", %relay_url)),
453+
.instrument(info_span!("QAD-IPv4", %relay_url)),
454454
);
455455
}
456456

@@ -468,7 +468,7 @@ impl Client {
468468
PROBES_TIMEOUT,
469469
run_probe_v6(ip_mapped_addrs, relay_node, quic_client, dns_resolver),
470470
))
471-
.instrument(info_span!("QAD IPv6", %relay_url)),
471+
.instrument(info_span!("QAD-IPv6", %relay_url)),
472472
);
473473
}
474474
}
@@ -700,30 +700,28 @@ async fn run_probe_v4(
700700
};
701701

702702
let observer = Watchable::new(None);
703-
let ob = observer.clone();
704703
let node = relay_node.url.clone();
705-
let conn2 = conn.clone();
706-
let handle = task::spawn(async move {
707-
loop {
708-
let val = *receiver.borrow();
709-
// if we've sent to an ipv4 address, but received an observed address
710-
// that is ivp6 then the address is an [IPv4-Mapped IPv6 Addresses](https://doc.rust-lang.org/beta/std/net/struct.Ipv6Addr.html#ipv4-mapped-ipv6-addresses)
711-
let val = val.map(|val| SocketAddr::new(val.ip().to_canonical(), val.port()));
712-
let latency = conn2.rtt();
713-
trace!(?val, ?relay_addr, ?latency, "got addr V4");
714-
if ob
715-
.set(val.map(|addr| QadProbeReport {
716-
node: node.clone(),
717-
addr,
718-
latency,
719-
}))
720-
.is_err()
721-
{
722-
// cancel if the observer is gone
723-
break;
724-
}
725-
if receiver.changed().await.is_err() {
726-
break;
704+
let handle = task::spawn({
705+
let conn = conn.clone();
706+
let observer = observer.clone();
707+
async move {
708+
loop {
709+
let val = *receiver.borrow();
710+
// if we've sent to an ipv4 address, but received an observed address
711+
// that is ivp6 then the address is an [IPv4-Mapped IPv6 Addresses](https://doc.rust-lang.org/beta/std/net/struct.Ipv6Addr.html#ipv4-mapped-ipv6-addresses)
712+
let val = val.map(|val| SocketAddr::new(val.ip().to_canonical(), val.port()));
713+
let latency = conn.rtt();
714+
trace!(?val, ?relay_addr, ?latency, "got addr V4");
715+
observer
716+
.set(val.map(|addr| QadProbeReport {
717+
node: node.clone(),
718+
addr,
719+
latency,
720+
}))
721+
.ok();
722+
if receiver.changed().await.is_err() {
723+
break;
724+
}
727725
}
728726
}
729727
});
@@ -769,30 +767,28 @@ async fn run_probe_v6(
769767
};
770768

771769
let observer = Watchable::new(None);
772-
let ob = observer.clone();
773770
let node = relay_node.url.clone();
774-
let conn2 = conn.clone();
775-
let handle = task::spawn(async move {
776-
loop {
777-
let val = *receiver.borrow();
778-
// if we've sent to an ipv4 address, but received an observed address
779-
// that is ivp6 then the address is an IPv4-Mapped IPv6 Addresses
780-
let val = val.map(|val| SocketAddr::new(val.ip().to_canonical(), val.port()));
781-
let latency = conn2.rtt();
782-
trace!(?val, ?relay_addr, ?latency, "got addr V6");
783-
if ob
784-
.set(val.map(|addr| QadProbeReport {
785-
node: node.clone(),
786-
addr,
787-
latency,
788-
}))
789-
.is_err()
790-
{
791-
// cancel if the observer is gone
792-
break;
793-
}
794-
if receiver.changed().await.is_err() {
795-
break;
771+
let handle = task::spawn({
772+
let observer = observer.clone();
773+
let conn = conn.clone();
774+
async move {
775+
loop {
776+
let val = *receiver.borrow();
777+
// if we've sent to an ipv4 address, but received an observed address
778+
// that is ivp6 then the address is an [IPv4-Mapped IPv6 Addresses](https://doc.rust-lang.org/beta/std/net/struct.Ipv6Addr.html#ipv4-mapped-ipv6-addresses)
779+
let val = val.map(|val| SocketAddr::new(val.ip().to_canonical(), val.port()));
780+
let latency = conn.rtt();
781+
trace!(?val, ?relay_addr, ?latency, "got addr V6");
782+
observer
783+
.set(val.map(|addr| QadProbeReport {
784+
node: node.clone(),
785+
addr,
786+
latency,
787+
}))
788+
.ok();
789+
if receiver.changed().await.is_err() {
790+
break;
791+
}
796792
}
797793
}
798794
});

0 commit comments

Comments
 (0)