22// SPDX-License-Identifier: Apache-2.0
33
44use crate :: { perf, tls, Result } ;
5- use futures:: future:: try_join_all;
65use s2n_quic:: { client, provider:: io, Client , Connection } ;
76use std:: path:: PathBuf ;
87use structopt:: StructOpt ;
8+ use tokio:: task:: JoinSet ;
99
1010#[ derive( Debug , StructOpt ) ]
1111pub struct Perf {
@@ -44,6 +44,9 @@ pub struct Perf {
4444 #[ structopt( long, default_value) ]
4545 receive : u64 ,
4646
47+ #[ structopt( long, default_value = "1" ) ]
48+ streams : u64 ,
49+
4750 #[ structopt( flatten) ]
4851 limits : perf:: Limits ,
4952
@@ -56,7 +59,7 @@ impl Perf {
5659 pub async fn run ( & self ) -> Result < ( ) > {
5760 let mut client = self . client ( ) ?;
5861
59- let mut requests = vec ! [ ] ;
62+ let mut requests = JoinSet :: new ( ) ;
6063
6164 // TODO support a richer connection strategy
6265 for _ in 0 ..self . connections . unwrap_or ( 1 ) {
@@ -69,37 +72,45 @@ impl Perf {
6972 }
7073 let connection = client. connect ( connect) . await ?;
7174
72- requests. push ( handle_connection ( connection, self . send , self . receive ) ) ;
75+ requests. spawn ( handle_connection (
76+ connection,
77+ self . streams ,
78+ self . send ,
79+ self . receive ,
80+ ) ) ;
7381 }
7482
75- try_join_all ( requests) . await ?;
83+ // wait until all of the connections finish before closing
84+ while requests. join_next ( ) . await . is_some ( ) { }
85+
7686 client. wait_idle ( ) . await ?;
7787
7888 return Ok ( ( ) ) ;
7989
8090 async fn handle_connection (
8191 mut connection : Connection ,
92+ streams : u64 ,
8293 send : u64 ,
8394 receive : u64 ,
8495 ) -> Result < ( ) > {
8596 if send == 0 && receive == 0 {
8697 return Ok ( ( ) ) ;
8798 }
8899
89- let stream = connection. open_bidirectional_stream ( ) . await ?;
90- let ( receive_stream, mut send_stream) = stream. split ( ) ;
100+ for _ in 0 ..streams {
101+ let stream = connection. open_bidirectional_stream ( ) . await ?;
102+ let ( receive_stream, mut send_stream) = stream. split ( ) ;
91103
92- let s = tokio :: spawn ( async move {
93- perf:: write_stream_size ( & mut send_stream, receive) . await ?;
94- perf:: handle_send_stream ( send_stream, send) . await ?;
95- <Result < ( ) > >:: Ok ( ( ) )
96- } ) ;
104+ let s = async move {
105+ perf:: write_stream_size ( & mut send_stream, receive) . await ?;
106+ perf:: handle_send_stream ( send_stream, send) . await ?;
107+ <Result < ( ) > >:: Ok ( ( ) )
108+ } ;
97109
98- let r = tokio :: spawn ( perf:: handle_receive_stream ( receive_stream) ) ;
110+ let r = perf:: handle_receive_stream ( receive_stream) ;
99111
100- let ( s, r) = tokio:: try_join!( s, r) ?;
101- s?;
102- r?;
112+ let _ = tokio:: try_join!( s, r) ?;
113+ }
103114
104115 Ok ( ( ) )
105116 }
0 commit comments