Skip to content

Conversation

@camshaft
Copy link
Contributor

@camshaft camshaft commented Mar 27, 2025

Description of changes:

This change adds a client RPC API for streams.

let response = client.rpc(server_addr, request, response_collector).await?;

This is a lot more optimized that the standard stream interface due to:

  • No need for an empty "prelude" packet since the request data is all provided up front

  • The stream packets indicate the final stream size from the very first packet so the client doesn't need to send an empty "fin" packet.

  • Ensuring the application doesn't deadlock where the client is sending a large request and getting a large response back. Consider the following example:

    // client
    stream.write_all(&large_payload).await.unwrap();
    // server
    let mut chunk = vec![];
    loop {
          let len = stream.read_buf(&mut chunk).await.unwrap();
          if len == 0 {
              break;
          }
          stream.write_all(&chunk[..len]).await.unwrap();
          chunk.clear();
    }

    Greater than some size of request, this will deadlock due to the client not reading the response and the server being blocked on flow control. This RPC API avoids this issue by polling both a read and write future concurrently.

In the future we can potentially optimize things more since we're operating at a higher level interface with a lot more knowledge of what the application is wanting to do.

Call-outs:

I had to do a couple of refactors around stream states that were leading to unoptimal packet patterns. This is what the packet capture looked like before my changes:

2025-03-27-180332_1241x202_scrot

And with these changes we went from 11 to 4:

2025-03-27-180410_1130x84_scrot

In theory, we could probably get this down to 3 where the server sends both the stream and control packets in a single datagram but it's a bit complicated to do that since both stream halves operate mostly independent right now.

Testing:

I added some tests showing everything working. I've also got a pending change in bach where you can monitor the network and make assertions about what packets are sent. I have a test showing that the total number of packets is 4. I also have a test that injects some loss and shows that things still work but that's currently failing when the first client packet is lost so I will follow up with a PR to fix that.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@camshaft camshaft marked this pull request as ready for review March 28, 2025 00:09
Comment on lines +35 to +37
if !storage.has_remaining_capacity() {
return Err(io::Error::new(io::ErrorKind::Other, "the provided response buffer failed to provide enough capacity for the peer's response"));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if !storage.has_remaining_capacity() {
return Err(io::Error::new(io::ErrorKind::Other, "the provided response buffer failed to provide enough capacity for the peer's response"));
}
ensure!(storage.has_remaining_capacity(), Err(io::Error::new(io::ErrorKind::Other, "the provided response buffer failed to provide enough capacity for the peer's response")));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll fix it in that follow up

@camshaft camshaft merged commit 1473a86 into main Mar 28, 2025
126 checks passed
@camshaft camshaft deleted the camshaft/dc-rpc branch March 28, 2025 01:09
@camshaft camshaft requested a review from Copilot March 28, 2025 01:10
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces an optimized RPC API for streams to allow a more efficient request/response mechanism without the overhead of extra packets and potential deadlock scenarios.

  • Adds new tests and client RPC implementations to verify RPC behavior under normal conditions and packet loss.
  • Refactors connection and handshake handling across the client, stream testing, and shared modules to support the new RPC API.
  • Updates auxiliary modules and configuration (e.g. Clippy MSRV bump) to align with the changes.

Reviewed Changes

Copilot reviewed 15 out of 17 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
dc/s2n-quic-dc/src/stream/tests/rpc.rs New tests for the RPC stream API
dc/s2n-quic-dc/src/stream/tests/deterministic.rs Updated connection API usage in deterministic tests
dc/s2n-quic-dc/src/stream/tests.rs Included new RPC module
dc/s2n-quic-dc/src/stream/testing.rs Refactored connection and RPC helper functions
dc/s2n-quic-dc/src/stream/shared/handshake.rs Added handshake state machine definitions
dc/s2n-quic-dc/src/stream/shared.rs Updated handshake handling for stream state transitions
dc/s2n-quic-dc/src/stream/send/worker.rs Adjusted handshake initialization and state transition triggering
dc/s2n-quic-dc/src/stream/send/state.rs Updated use of div_ceil and refactored finish logic
dc/s2n-quic-dc/src/stream/send/application/builder.rs Renamed field from 'open' to 'status' to improve clarity
dc/s2n-quic-dc/src/stream/send/application.rs Updated stream status management and shutdown logic
dc/s2n-quic-dc/src/stream/recv/shared.rs Modified handshake integration in the receiver side
dc/s2n-quic-dc/src/stream/endpoint.rs Provided endpoint type to the reader for improved initialization
dc/s2n-quic-dc/src/stream/client/rpc.rs New client RPC implementation leveraging stream split
dc/s2n-quic-dc/src/stream/client.rs Exported the new RPC module
.clippy.toml Updated minimum supported Rust version to 1.75.0
Files not reviewed (2)
  • dc/s2n-quic-dc/src/stream/shared/snapshots/s2n_quic_dc__stream__shared__handshake__tests__dot_test.snap: Language not supported
  • dc/s2n-quic-dc/src/stream/shared/snapshots/s2n_quic_dc__stream__shared__handshake__tests__snapshots.snap: Language not supported
Comments suppressed due to low confidence (2)

dc/s2n-quic-dc/src/stream/recv/shared.rs:584

  • Consider handling the return value of 'handshake.on_non_zero_next_expected_control_packet()' to verify that the state transition occurs as expected and to handle potential errors.
if packet.next_expected_control_packet().as_u64() > 0 { let _ = self.handshake.on_non_zero_next_expected_control_packet(); }

dc/s2n-quic-dc/src/stream/client/rpc.rs:36

  • [nitpick] Consider simplifying the error message to improve clarity, for example: 'Insufficient response buffer capacity'.
return Err(io::Error::new(io::ErrorKind::Other, "the provided response buffer failed to provide enough capacity for the peer's response"));

if res.is_ok() {
self.any_valid_packets = true;
self.remote_addr = *remote_addr;
let _ = self.handshake.on_control_packet();
Copy link

Copilot AI Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the result of 'handshake.on_control_packet()' instead of discarding it, to ensure that handshake state transitions are correctly applied and any errors are surfaced.

Suggested change
let _ = self.handshake.on_control_packet();
if let Err(e) = self.handshake.on_control_packet() {
return Err(e.into());
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants