Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions quic/s2n-quic-qns/src/client/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{perf, tls, Result};
use futures::future::try_join_all;
use s2n_quic::{client, provider::io, Client, Connection};
use std::path::PathBuf;
use structopt::StructOpt;
use tokio::task::JoinSet;

#[derive(Debug, StructOpt)]
pub struct Perf {
Expand Down Expand Up @@ -44,6 +44,9 @@ pub struct Perf {
#[structopt(long, default_value)]
receive: u64,

#[structopt(long, default_value = "1")]
streams: u64,

#[structopt(flatten)]
limits: perf::Limits,

Expand All @@ -56,7 +59,7 @@ impl Perf {
pub async fn run(&self) -> Result<()> {
let mut client = self.client()?;

let mut requests = vec![];
let mut requests = JoinSet::new();

// TODO support a richer connection strategy
for _ in 0..self.connections.unwrap_or(1) {
Expand All @@ -69,37 +72,45 @@ impl Perf {
}
let connection = client.connect(connect).await?;

requests.push(handle_connection(connection, self.send, self.receive));
requests.spawn(handle_connection(
connection,
self.streams,
self.send,
self.receive,
));
}

try_join_all(requests).await?;
// wait until all of the connections finish before closing
while requests.join_next().await.is_some() {}

client.wait_idle().await?;

return Ok(());

async fn handle_connection(
mut connection: Connection,
streams: u64,
send: u64,
receive: u64,
) -> Result<()> {
if send == 0 && receive == 0 {
return Ok(());
}

let stream = connection.open_bidirectional_stream().await?;
let (receive_stream, mut send_stream) = stream.split();
for _ in 0..streams {
let stream = connection.open_bidirectional_stream().await?;
let (receive_stream, mut send_stream) = stream.split();

let s = tokio::spawn(async move {
perf::write_stream_size(&mut send_stream, receive).await?;
perf::handle_send_stream(send_stream, send).await?;
<Result<()>>::Ok(())
});
let s = async move {
perf::write_stream_size(&mut send_stream, receive).await?;
perf::handle_send_stream(send_stream, send).await?;
<Result<()>>::Ok(())
};

let r = tokio::spawn(perf::handle_receive_stream(receive_stream));
let r = perf::handle_receive_stream(receive_stream);

let (s, r) = tokio::try_join!(s, r)?;
s?;
r?;
let _ = tokio::try_join!(s, r)?;
}

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions quic/s2n-quic-qns/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::{

/// Drains a receive stream
pub async fn handle_receive_stream(mut stream: ReceiveStream) -> Result<()> {
let mut chunks = vec![Bytes::new(); 64];
let mut chunks: [_; 64] = core::array::from_fn(|_| Bytes::new());

loop {
let (len, is_open) = stream.receive_vectored(&mut chunks).await?;
Expand All @@ -35,7 +35,7 @@ pub async fn handle_receive_stream(mut stream: ReceiveStream) -> Result<()> {

/// Sends a specified amount of data on a send stream
pub async fn handle_send_stream(mut stream: SendStream, len: u64) -> Result<()> {
let mut chunks = vec![Bytes::new(); 64];
let mut chunks: [_; 64] = core::array::from_fn(|_| Bytes::new());

//= https://tools.ietf.org/id/draft-banks-quic-performance-00#4.1
//# Since the goal here is to measure the efficiency of the QUIC
Expand Down