Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ incremental = false
lto = true
codegen-units = 1
incremental = false
# improve flamegraph information
debug = true

[profile.fuzz]
inherits = "dev"
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "Apache-2.0"
publish = false

[dependencies]
bytes = "1"
criterion = { version = "0.4", features = ["html_reports"] }
crossbeam-channel = { version = "0.5" }
internet-checksum = "0.2"
Expand Down
28 changes: 26 additions & 2 deletions quic/s2n-quic-bench/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn benchmarks(c: &mut Criterion) {
let len = VarInt::new(input.len() as _).unwrap();
b.iter(move || {
buffer.write_at(offset, input).unwrap();
while buffer.pop().is_some() {}
buffer.copy_into_buf(&mut NoOpBuf);
offset += len;
});
});
Expand All @@ -44,10 +44,34 @@ pub fn benchmarks(c: &mut Criterion) {
buffer.write_at(first_offset, input).unwrap();
let second_offset = offset;
buffer.write_at(second_offset, input).unwrap();
while buffer.pop().is_some() {}
buffer.copy_into_buf(&mut NoOpBuf);
offset = first_offset + len;
});
},
);
}
}

/// A BufMut implementation that doesn't actually copy data into it
///
/// This is used to avoid oversampling the `pop` implementation for
/// `write_at` benchmarks.
struct NoOpBuf;

unsafe impl bytes::BufMut for NoOpBuf {
#[inline]
fn remaining_mut(&self) -> usize {
usize::MAX
}

#[inline]
unsafe fn advance_mut(&mut self, _cnt: usize) {}

#[inline]
fn put_slice(&mut self, _slice: &[u8]) {}

#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
unimplemented!()
}
}
91 changes: 78 additions & 13 deletions quic/s2n-quic-core/src/buffer/receive_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ impl ReceiveBuffer {
/// Returns true if no bytes are available for reading
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
if let Some(slot) = self.slots.front() {
!slot.is_occupied(self.start_offset)
} else {
true
}
}

/// Returns the number of bytes and chunks available for consumption
Expand Down Expand Up @@ -364,11 +368,13 @@ impl ReceiveBuffer {
#[inline]
pub fn pop(&mut self) -> Option<BytesMut> {
self.pop_transform(|buffer, is_final_offset| {
if is_final_offset || buffer.len() == buffer.capacity() {
let chunk = if is_final_offset || buffer.len() == buffer.capacity() {
core::mem::take(buffer)
} else {
buffer.split()
}
};
let len = chunk.len();
(chunk, len)
})
}

Expand All @@ -381,23 +387,65 @@ impl ReceiveBuffer {
let watermark = watermark.min(buffer.len());

// if the watermark is 0 then don't needlessly increment refcounts
ensure!(watermark > 0, BytesMut::new());
ensure!(watermark > 0, (BytesMut::new(), 0));

if watermark == buffer.len() && is_final_offset {
return core::mem::take(buffer);
return (core::mem::take(buffer), watermark);
}

buffer.split_to(watermark)
(buffer.split_to(watermark), watermark)
})
}

/// Copies all the available buffered data into the provided `buf`'s remaining capacity.
///
/// This method is slightly more efficient than [`Self::pop`] when the caller ends up copying
/// the buffered data into another slice, since it avoids a refcount increment/decrement on
/// the contained [`BytesMut`].
///
/// The total number of bytes copied is returned.
#[inline]
pub fn copy_into_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> usize {
use bytes::Buf;

let mut total = 0;

loop {
let remaining = buf.remaining_mut();
// ensure we have enough capacity in the destination buf
ensure!(remaining > 0, total);

let transform = |buffer: &mut BytesMut, is_final_offset| {
let watermark = buffer.len().min(remaining);

// copy bytes into the destination buf
buf.put_slice(&buffer[..watermark]);
// advance the chunk rather than splitting to avoid refcount churn
buffer.advance(watermark);
total += watermark;

(is_final_offset, watermark)
};

match self.pop_transform(transform) {
// if we're at the final offset, then no need to keep iterating
Some(true) => break,
Some(false) => continue,
// no more available chunks
None => break,
}
}

total
}

/// Pops a buffer from the front of the receive queue as long as the `transform` function returns a
/// non-empty buffer.
#[inline]
fn pop_transform<F: Fn(&mut BytesMut, bool) -> BytesMut>(
fn pop_transform<F: FnOnce(&mut BytesMut, bool) -> (O, usize), O>(
&mut self,
transform: F,
) -> Option<BytesMut> {
) -> Option<O> {
let slot = self.slots.front_mut()?;

// make sure the slot has some data
Expand All @@ -406,19 +454,19 @@ impl ReceiveBuffer {
let is_final_offset = self.final_offset == slot.end();
let buffer = slot.data_mut();

let out = transform(buffer, is_final_offset);
let (out, len) = transform(buffer, is_final_offset);

// filter out empty buffers
ensure!(!out.is_empty(), None);
ensure!(len > 0, None);

slot.add_start(out.len());
slot.add_start(len);

if slot.should_drop() {
// remove empty buffers
self.slots.pop_front();
}

self.start_offset += out.len() as u64;
self.start_offset += len as u64;

self.invariants();

Expand All @@ -438,7 +486,14 @@ impl ReceiveBuffer {
/// buffered and available for consumption.
#[inline]
pub fn total_received_len(&self) -> u64 {
self.consumed_len() + self.len() as u64
let mut offset = self.start_offset;

for slot in &self.slots {
ensure!(slot.is_occupied(offset), offset);
offset = slot.end();
}

offset
}

/// Resets the receive buffer.
Expand Down Expand Up @@ -581,6 +636,16 @@ impl ReceiveBuffer {
#[inline(always)]
fn invariants(&self) {
if cfg!(debug_assertions) {
assert_eq!(
self.total_received_len(),
self.consumed_len() + self.len() as u64
);

let (actual_len, chunks) = self.report();

assert_eq!(actual_len == 0, self.is_empty());
assert_eq!(self.iter().count(), chunks);

let mut prev_end = self.start_offset;

for slot in &self.slots {
Expand Down
25 changes: 22 additions & 3 deletions quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ fn write_and_pop() {
assert_eq!(offset, popped_bytes);
}

#[test]
#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time
fn write_and_copy_into_buf() {
let mut buffer = ReceiveBuffer::new();
let mut offset = VarInt::default();
let mut output = vec![];
for len in 0..10000 {
let chunk = Data::send_one_at(offset.as_u64(), len);
buffer.write_at(offset, &chunk).unwrap();
offset += chunk.len();
let copied_len = buffer.copy_into_buf(&mut output);
assert_eq!(copied_len, chunk.len());
assert_eq!(&output[..], &chunk[..]);
output.clear();
}
}

fn new_receive_buffer() -> ReceiveBuffer {
let buffer = ReceiveBuffer::new();
assert_eq!(buffer.len(), 0);
Expand Down Expand Up @@ -849,9 +866,11 @@ fn write_partial_fin_test() {
let mut allocated_len = 0;

// use pop_transform so we can take the entire buffer and get an accurate `capacity` value
while let Some(chunk) =
buf.pop_transform(|chunk, _is_final_chunk| core::mem::take(chunk))
{
while let Some(chunk) = buf.pop_transform(|chunk, _is_final_chunk| {
let chunk = core::mem::take(chunk);
let len = chunk.len();
(chunk, len)
}) {
actual_len += chunk.len();
allocated_len += chunk.capacity();
chunks.push(chunk);
Expand Down
4 changes: 2 additions & 2 deletions scripts/perf/build
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ if [ ! -f target/perf/quinn/bin/perf_client ] || [ ! -f target/perf/quinn/bin/pe
perf
fi

RUSTFLAGS="-g --cfg s2n_quic_unstable" cargo \
cargo \
+stable \
build \
--bin s2n-quic-qns \
--release
--profile bench