Skip to content

Commit fcfa086

Browse files
Latch the send_time at the beginning of the write loop
This will result in BBR sent packets having the correct send times in cases where multiple packets are emitted in a burst, and result in correct minRTT calculations. The next release time is computed by QUIC connection path since each path has it's own congestion controller. Ideally sends should be done in next release time order for all the paths to avoid generating packets on slower paths first.
1 parent 9b4af0d commit fcfa086

File tree

7 files changed

+283
-76
lines changed

7 files changed

+283
-76
lines changed

quiche/src/lib.rs

Lines changed: 147 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3833,6 +3833,91 @@ impl<F: BufFactory> Connection<F> {
38333833
pub fn send_on_path(
38343834
&mut self, out: &mut [u8], from: Option<SocketAddr>,
38353835
to: Option<SocketAddr>,
3836+
) -> Result<(usize, SendInfo)> {
3837+
let path_id = match (from, to) {
3838+
(Some(f), Some(t)) => self
3839+
.paths
3840+
.path_id_from_addrs(&(f, t))
3841+
.ok_or(Error::InvalidState)?,
3842+
3843+
_ => self.get_send_path_id(from, to)?,
3844+
};
3845+
3846+
let now = Instant::now();
3847+
3848+
self.send_with_path_id(out, PathId { path_id }, now)
3849+
}
3850+
3851+
/// Writes a single QUIC packet to be sent to the peer from the
3852+
/// specified path_id. Call `paths_id_iter()` to get a list of
3853+
/// path_ids to call `send_with_path_id()` on.
3854+
///
3855+
/// On success the number of bytes written to the output buffer is
3856+
/// returned, or [`Done`] if there was nothing to write.
3857+
///
3858+
/// The application should call `send_with_path_id()` multiple times until
3859+
/// [`Done`] is returned, indicating that there are no more packets to
3860+
/// send. It is recommended that `send_with_path_id()` be called in the
3861+
/// following cases:
3862+
///
3863+
/// * When the application receives QUIC packets from the peer (that is,
3864+
/// any time [`recv()`] is also called).
3865+
///
3866+
/// * When the connection timer expires (that is, any time [`on_timeout()`]
3867+
/// is also called).
3868+
///
3869+
/// * When the application sends data to the peer (for examples, any time
3870+
/// [`stream_send()`] or [`stream_shutdown()`] are called).
3871+
///
3872+
/// * When the application receives data from the peer (for example any
3873+
/// time [`stream_recv()`] is called).
3874+
///
3875+
/// Once [`is_draining()`] returns `true`, it is no longer necessary to call
3876+
/// `send_with_path_id()` and all calls will return [`Done`].
3877+
///
3878+
/// [`Done`]: enum.Error.html#variant.Done
3879+
/// [`InvalidState`]: enum.Error.html#InvalidState
3880+
/// [`recv()`]: struct.Connection.html#method.recv
3881+
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
3882+
/// [`stream_send()`]: struct.Connection.html#method.stream_send
3883+
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
3884+
/// [`stream_recv()`]: struct.Connection.html#method.stream_recv
3885+
/// [`path_event_next()`]: struct.Connection.html#method.path_event_next
3886+
/// [`paths_iter()`]: struct.Connection.html#method.paths_iter
3887+
/// [`is_draining()`]: struct.Connection.html#method.is_draining
3888+
///
3889+
/// ## Examples:
3890+
///
3891+
/// ```no_run
3892+
/// # let mut out = [0; 512];
3893+
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3894+
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3895+
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
3896+
/// # let peer = "127.0.0.1:1234".parse().unwrap();
3897+
/// # let local = socket.local_addr().unwrap();
3898+
/// # let mut conn = quiche::accept(&scid, None, local, peer, &mut config)?;
3899+
/// loop {
3900+
/// for path_id in self.paths_id_iter(local) {
3901+
/// let (write, send_info) = match conn.send_with_path_id(&mut out, path_id, Instant::now()) {
3902+
/// Ok(v) => v,
3903+
///
3904+
/// Err(quiche::Error::Done) => {
3905+
/// // Done writing.
3906+
/// break;
3907+
/// },
3908+
///
3909+
/// Err(e) => {
3910+
/// // An error occurred, handle it.
3911+
/// break;
3912+
/// },
3913+
/// };
3914+
///
3915+
/// socket.send_to(&out[..write], &send_info.to).unwrap();
3916+
/// }
3917+
/// # Ok::<(), quiche::Error>(())
3918+
/// ```
3919+
pub fn send_with_path_id(
3920+
&mut self, out: &mut [u8], path_id: PathId, now: Instant,
38363921
) -> Result<(usize, SendInfo)> {
38373922
if out.is_empty() {
38383923
return Err(Error::BufferTooShort);
@@ -3842,8 +3927,6 @@ impl<F: BufFactory> Connection<F> {
38423927
return Err(Error::Done);
38433928
}
38443929

3845-
let now = Instant::now();
3846-
38473930
if self.local_error.is_none() {
38483931
self.do_handshake(now)?;
38493932
}
@@ -3870,15 +3953,7 @@ impl<F: BufFactory> Connection<F> {
38703953
// maximum UDP payload size limit.
38713954
let mut left = cmp::min(out.len(), self.max_send_udp_payload_size());
38723955

3873-
let send_pid = match (from, to) {
3874-
(Some(f), Some(t)) => self
3875-
.paths
3876-
.path_id_from_addrs(&(f, t))
3877-
.ok_or(Error::InvalidState)?,
3878-
3879-
_ => self.get_send_path_id(from, to)?,
3880-
};
3881-
3956+
let send_pid = path_id.path_id;
38823957
let send_path = self.paths.get_mut(send_pid)?;
38833958

38843959
// Update max datagram size to allow path MTU discovery probe to be sent.
@@ -3938,13 +4013,6 @@ impl<F: BufFactory> Connection<F> {
39384013
break;
39394014
}
39404015
}
3941-
3942-
// Don't coalesce packets that must go on different paths.
3943-
if !(from.is_some() && to.is_some()) &&
3944-
self.get_send_path_id(from, to)? != send_pid
3945-
{
3946-
break;
3947-
}
39484016
}
39494017

39504018
if done == 0 {
@@ -5226,18 +5294,45 @@ impl<F: BufFactory> Connection<F> {
52265294
Ok(())
52275295
}
52285296

5229-
/// Returns the desired send time for the next packet.
5297+
/// Returns the desired send time for the next packet on the desired path.
52305298
#[inline]
5231-
pub fn get_next_release_time(&self) -> Option<ReleaseDecision> {
5299+
pub fn get_next_release_time(
5300+
&self, path_id: PathId,
5301+
) -> Option<ReleaseDecision> {
52325302
Some(
52335303
self.paths
5234-
.get_active()
5304+
.get(path_id.path_id)
52355305
.ok()?
52365306
.recovery
52375307
.get_next_release_time(),
52385308
)
52395309
}
52405310

5311+
/// Latches the packet send time for the selected path. The desired send
5312+
/// time must be >= now. Send time remains latched until cleared.
5313+
pub fn latch_packet_send_time(
5314+
&mut self, path_id: PathId, send_time: Instant,
5315+
) -> Result<()> {
5316+
self.paths
5317+
.get_mut(path_id.path_id)?
5318+
.recovery
5319+
.latch_packet_send_time(send_time);
5320+
5321+
Ok(())
5322+
}
5323+
5324+
/// Clear the latched packet send time on the selected path.
5325+
pub fn clear_latched_packet_send_time(
5326+
&mut self, path_id: PathId,
5327+
) -> Result<()> {
5328+
self.paths
5329+
.get_mut(path_id.path_id)?
5330+
.recovery
5331+
.clear_latched_packet_send_time();
5332+
5333+
Ok(())
5334+
}
5335+
52415336
/// Returns whether gcongestion is enabled.
52425337
#[inline]
52435338
pub fn gcongestion_enabled(&self) -> Option<bool> {
@@ -6950,6 +7045,35 @@ impl<F: BufFactory> Connection<F> {
69507045
}
69517046
}
69527047

7048+
/// Returns an iterator over `PathId`s whose association with `from` forms
7049+
/// a known QUIC path on which packets can be sent to.
7050+
///
7051+
/// This function is typically used in combination with [`send_with_path_id()`].
7052+
///
7053+
/// Note that the iterator includes all the possible combination of
7054+
/// destination `PathId`s, even those whose sending is not required now.
7055+
/// In other words, this is another way for the application to recall from
7056+
/// past [`PathEvent::New`] events.
7057+
///
7058+
/// [`PathEvent::New`]: enum.PathEvent.html#variant.New
7059+
/// [`send_with_path_id()`]: struct.Connection.html#method.send_with_path_id
7060+
pub fn paths_id_iter(&self, from: SocketAddr) -> PathIdIter {
7061+
// Instead of trying to identify whether packets will be sent on the
7062+
// given 4-tuple, simply filter paths that cannot be used.
7063+
PathIdIter {
7064+
paths: self
7065+
.paths
7066+
.iter()
7067+
.filter(|(_, p)| p.active_dcid_seq.is_some())
7068+
.filter(|(_, p)| p.usable() || p.probing_required())
7069+
.filter(|(_, p)| p.local_addr() == from)
7070+
.map(|(path_id, _p)| PathId { path_id })
7071+
.collect(),
7072+
7073+
index: 0,
7074+
}
7075+
}
7076+
69537077
/// Closes the connection with the given error and reason.
69547078
///
69557079
/// The `app` parameter specifies whether an application close should be
@@ -9417,6 +9541,8 @@ pub use crate::packet::Header;
94179541
pub use crate::packet::Type;
94189542

94199543
pub use crate::path::PathEvent;
9544+
pub use crate::path::PathId;
9545+
pub use crate::path::PathIdIter;
94209546
pub use crate::path::PathStats;
94219547
pub use crate::path::SocketAddrIter;
94229548

quiche/src/path.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,37 @@ impl Path {
569569
}
570570
}
571571

572+
/// Opaque identifier for a Path on a QUIC connection.
573+
#[derive(Clone, Copy, Debug)]
574+
pub struct PathId {
575+
pub(crate) path_id: usize,
576+
}
577+
578+
/// An iterator over PathId.
579+
#[derive(Default)]
580+
pub struct PathIdIter {
581+
pub(crate) paths: SmallVec<[PathId; 8]>,
582+
pub(crate) index: usize,
583+
}
584+
585+
impl Iterator for PathIdIter {
586+
type Item = PathId;
587+
588+
#[inline]
589+
fn next(&mut self) -> Option<Self::Item> {
590+
let v = self.paths.get(self.index)?;
591+
self.index += 1;
592+
Some(*v)
593+
}
594+
}
595+
596+
impl ExactSizeIterator for PathIdIter {
597+
#[inline]
598+
fn len(&self) -> usize {
599+
self.paths.len() - self.index
600+
}
601+
}
602+
572603
/// An iterator over SocketAddr.
573604
#[derive(Default)]
574605
pub struct SocketAddrIter {

quiche/src/recovery/congestion/recovery.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,13 @@ impl RecoveryOps for LegacyRecovery {
631631
now
632632
}
633633

634+
fn latch_packet_send_time(&mut self, _now: Instant) {
635+
// Not needed because recovery algorithms in this directory do
636+
// not implement pacing.
637+
}
638+
639+
fn clear_latched_packet_send_time(&mut self) {}
640+
634641
// `peer_sent_ack_ranges` should not be used without validation.
635642
fn on_ack_received(
636643
&mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64, epoch: Epoch,

quiche/src/recovery/gcongestion/recovery.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,8 @@ pub struct GRecovery {
465465
lost_reuse: Vec<Lost>,
466466

467467
pacer: Pacer,
468+
469+
latched_packet_send_time: Option<Instant>,
468470
}
469471

470472
impl GRecovery {
@@ -518,6 +520,7 @@ impl GRecovery {
518520

519521
newly_acked: Vec::new(),
520522
lost_reuse: Vec::new(),
523+
latched_packet_send_time: None,
521524
})
522525
}
523526

@@ -688,7 +691,7 @@ impl RecoveryOps for GRecovery {
688691
&mut self, pkt: Sent, epoch: packet::Epoch,
689692
handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
690693
) {
691-
let time_sent = self.get_next_release_time().time(now).unwrap_or(now);
694+
let time_sent = self.get_packet_send_time(now);
692695

693696
let epoch = &mut self.epochs[epoch];
694697

@@ -749,7 +752,18 @@ impl RecoveryOps for GRecovery {
749752
}
750753

751754
fn get_packet_send_time(&self, now: Instant) -> Instant {
752-
self.pacer.get_next_release_time().time(now).unwrap_or(now)
755+
// TODO debug_assert that latched time >= now.
756+
self.latched_packet_send_time
757+
.filter(|v| *v > now)
758+
.unwrap_or(now)
759+
}
760+
761+
fn latch_packet_send_time(&mut self, send_time: Instant) {
762+
self.latched_packet_send_time = Some(send_time);
763+
}
764+
765+
fn clear_latched_packet_send_time(&mut self) {
766+
self.latched_packet_send_time = None;
753767
}
754768

755769
// `peer_sent_ack_ranges` should not be used without validation.
@@ -802,6 +816,7 @@ impl RecoveryOps for GRecovery {
802816
has_ack_eliciting;
803817
if update_rtt {
804818
let latest_rtt = now - largest_newly_acked.time_sent;
819+
805820
self.rtt_stats.update_rtt(
806821
latest_rtt,
807822
Duration::from_micros(ack_delay),

0 commit comments

Comments
 (0)