Skip to content

Commit 1b3acb1

Browse files
committed
refactor(net): replave rx_deferred_frame
Before `readv` change, `rx_deferred_frame` was used to signal if there was present packet in the internal rx buffer, so we would try to send it first befor attempting to read new one. With `readv` we don't use internal buffer for RX packets anymore, so the logic of `deferred` packet changed. Now any packet is `deferred` when it is written to the guest memory. After the write we try to notify guest about it, but only if rate limiter allows it. To track this we now use `deferred_rx_bytes` which holds the number of bytes we yet to notify guest about. We can conviently use `Option<NonZeroUsize>` for this where `Some(...)` will mean there are some `deferred` bytes we yet to notify guest about and `None` mean we are done and can read next packet. Signed-off-by: Egor Lazarchuk <[email protected]>
1 parent 1486edb commit 1b3acb1

File tree

1 file changed

+44
-135
lines changed

1 file changed

+44
-135
lines changed

src/vmm/src/devices/virtio/net/device.rs

Lines changed: 44 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use std::mem;
99
use std::net::Ipv4Addr;
10+
use std::num::NonZeroUsize;
1011
use std::sync::{Arc, Mutex};
1112

1213
use libc::EAGAIN;
@@ -107,9 +108,9 @@ pub struct Net {
107108
pub(crate) rx_rate_limiter: RateLimiter,
108109
pub(crate) tx_rate_limiter: RateLimiter,
109110

110-
pub(crate) rx_deferred_frame: bool,
111-
112-
rx_bytes_read: usize,
111+
/// Used to store last RX packet size and
112+
/// rate limit RX queue.
113+
deferred_rx_bytes: Option<NonZeroUsize>,
113114
rx_frame_buf: [u8; MAX_BUFFER_SIZE],
114115

115116
tx_frame_headers: [u8; frame_hdr_len()],
@@ -176,8 +177,7 @@ impl Net {
176177
queue_evts,
177178
rx_rate_limiter,
178179
tx_rate_limiter,
179-
rx_deferred_frame: false,
180-
rx_bytes_read: 0,
180+
deferred_rx_bytes: None,
181181
rx_frame_buf: [0u8; MAX_BUFFER_SIZE],
182182
tx_frame_headers: [0u8; frame_hdr_len()],
183183
irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?,
@@ -299,16 +299,22 @@ impl Net {
299299
// Attempts to copy a single frame into the guest if there is enough
300300
// rate limiting budget.
301301
// Returns true on successful frame delivery.
302-
fn rate_limited_rx_single_frame(&mut self) -> bool {
303-
let rx_queue = &mut self.queues[RX_INDEX];
304-
if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) {
305-
self.metrics.rx_rate_limiter_throttled.inc();
306-
return false;
302+
fn send_deferred_rx_bytes(&mut self) -> bool {
303+
match self.deferred_rx_bytes {
304+
Some(bytes) => {
305+
if Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, bytes.get() as u64) {
306+
// The packet is good to go, reset `deferred_rx_bytes`.
307+
self.deferred_rx_bytes = None;
308+
// Attempt frame delivery.
309+
self.rx_buffer.notify_queue(&mut self.queues[RX_INDEX]);
310+
true
311+
} else {
312+
self.metrics.rx_rate_limiter_throttled.inc();
313+
false
314+
}
315+
}
316+
None => true,
307317
}
308-
309-
// Attempt frame delivery.
310-
self.rx_buffer.notify_queue(rx_queue);
311-
true
312318
}
313319

314320
/// Parse available RX `DescriptorChains` from the queue and
@@ -457,6 +463,10 @@ impl Net {
457463

458464
/// Read as many frames as possible.
459465
fn process_rx(&mut self) -> Result<(), DeviceError> {
466+
if !self.send_deferred_rx_bytes() {
467+
return Ok(());
468+
}
469+
460470
if self.rx_buffer.is_empty() {
461471
self.parse_rx_descriptors();
462472
}
@@ -468,11 +478,13 @@ impl Net {
468478
break;
469479
}
470480
Ok(count) => {
471-
self.rx_bytes_read = count;
481+
// SAFETY: the match statement guaranties that `count` is not zero.
482+
unsafe {
483+
self.deferred_rx_bytes = Some(NonZeroUsize::new_unchecked(count));
484+
}
472485
self.metrics.rx_count.inc();
473486
self.metrics.rx_packets_count.inc();
474-
if !self.rate_limited_rx_single_frame() {
475-
self.rx_deferred_frame = true;
487+
if !self.send_deferred_rx_bytes() {
476488
break;
477489
}
478490
}
@@ -498,26 +510,6 @@ impl Net {
498510
self.try_signal_queue(NetQueue::Rx)
499511
}
500512

501-
// Process the deferred frame first, then continue reading from tap.
502-
fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> {
503-
if self.rate_limited_rx_single_frame() {
504-
self.rx_deferred_frame = false;
505-
// process_rx() was interrupted possibly before consuming all
506-
// packets in the tap; try continuing now.
507-
return self.process_rx();
508-
}
509-
510-
self.try_signal_queue(NetQueue::Rx)
511-
}
512-
513-
fn resume_rx(&mut self) -> Result<(), DeviceError> {
514-
if self.rx_deferred_frame {
515-
self.handle_deferred_frame()
516-
} else {
517-
self.process_rx()
518-
}
519-
}
520-
521513
fn process_tx(&mut self) -> Result<(), DeviceError> {
522514
// This is safe since we checked in the event handler that the device is activated.
523515
let mem = self.device_state.mem().unwrap();
@@ -576,7 +568,7 @@ impl Net {
576568
&self.metrics,
577569
)
578570
.unwrap_or(false);
579-
if frame_consumed_by_mmds && !self.rx_deferred_frame {
571+
if frame_consumed_by_mmds {
580572
// MMDS consumed this frame/request, let's also try to process the response.
581573
process_rx_for_mmds = true;
582574
}
@@ -687,7 +679,7 @@ impl Net {
687679
self.metrics.rx_rate_limiter_throttled.inc();
688680
} else {
689681
// If the limiter is not blocked, resume the receiving of bytes.
690-
self.resume_rx()
682+
self.process_rx()
691683
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
692684
}
693685
}
@@ -696,31 +688,14 @@ impl Net {
696688
// This is safe since we checked in the event handler that the device is activated.
697689
self.metrics.rx_tap_event_count.inc();
698690

699-
// While there are no available RX queue buffers and there's a deferred_frame
700-
// don't process any more incoming. Otherwise start processing a frame. In the
701-
// process the deferred_frame flag will be set in order to avoid freezing the
702-
// RX queue.
703-
if self.queues[RX_INDEX].is_empty() && self.rx_deferred_frame {
704-
self.metrics.no_rx_avail_buffer.inc();
705-
return;
706-
}
707-
708691
// While limiter is blocked, don't process any more incoming.
709692
if self.rx_rate_limiter.is_blocked() {
710693
self.metrics.rx_rate_limiter_throttled.inc();
711694
return;
712695
}
713696

714-
if self.rx_deferred_frame
715-
// Process a deferred frame first if available. Don't read from tap again
716-
// until we manage to receive this deferred frame.
717-
{
718-
self.handle_deferred_frame()
719-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
720-
} else {
721-
self.process_rx()
722-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
723-
}
697+
self.process_rx()
698+
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
724699
}
725700

726701
/// Process a single TX queue event.
@@ -750,7 +725,7 @@ impl Net {
750725
match self.rx_rate_limiter.event_handler() {
751726
Ok(_) => {
752727
// There might be enough budget now to receive the frame.
753-
self.resume_rx()
728+
self.process_rx()
754729
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
755730
}
756731
Err(err) => {
@@ -779,7 +754,7 @@ impl Net {
779754

780755
/// Process device virtio queue(s).
781756
pub fn process_virtio_queues(&mut self) {
782-
let _ = self.resume_rx();
757+
let _ = self.process_rx();
783758
let _ = self.process_tx();
784759
}
785760
}
@@ -1197,7 +1172,7 @@ pub mod tests {
11971172
th.rxq.check_used_elem(0, 0, 0);
11981173
th.rxq.check_used_elem(1, 3, 0);
11991174
// Check that the frame wasn't deferred.
1200-
assert!(!th.net().rx_deferred_frame);
1175+
assert!(th.net().deferred_rx_bytes.is_none());
12011176
// Check that the frame has been written successfully to the valid Rx descriptor chain.
12021177
th.rxq
12031178
.check_used_elem(2, 4, frame.len().try_into().unwrap());
@@ -1244,7 +1219,7 @@ pub mod tests {
12441219
);
12451220

12461221
// Check that the frame wasn't deferred.
1247-
assert!(!th.net().rx_deferred_frame);
1222+
assert!(th.net().deferred_rx_bytes.is_none());
12481223
// Check that the used queue has advanced.
12491224
assert_eq!(th.rxq.used.idx.get(), 1);
12501225
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1297,7 +1272,7 @@ pub mod tests {
12971272
);
12981273

12991274
// Check that the frames weren't deferred.
1300-
assert!(!th.net().rx_deferred_frame);
1275+
assert!(th.net().deferred_rx_bytes.is_none());
13011276
// Check that the used queue has advanced.
13021277
assert_eq!(th.rxq.used.idx.get(), 2);
13031278
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1361,7 +1336,7 @@ pub mod tests {
13611336
);
13621337

13631338
// Check that the frame wasn't deferred.
1364-
assert!(!th.net().rx_deferred_frame);
1339+
assert!(th.net().deferred_rx_bytes.is_none());
13651340
// Check that the used queue has advanced.
13661341
assert_eq!(th.rxq.used.idx.get(), 2);
13671342
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1787,20 +1762,7 @@ pub mod tests {
17871762
// SAFETY: its a valid fd
17881763
unsafe { libc::close(th.net.lock().unwrap().tap.as_raw_fd()) };
17891764

1790-
// The RX queue is empty and rx_deffered_frame is set.
1791-
th.net().rx_deferred_frame = true;
1792-
check_metric_after_block!(
1793-
th.net().metrics.no_rx_avail_buffer,
1794-
1,
1795-
th.simulate_event(NetEvent::Tap)
1796-
);
1797-
1798-
// We need to set this here to false, otherwise the device will try to
1799-
// handle a deferred frame, it will fail and will never try to read from
1800-
// the tap.
1801-
th.net().rx_deferred_frame = false;
1802-
1803-
// Fake an avail buffer; this time, tap reading should error out.
1765+
// Fake an avail buffer; tap reading should error out.
18041766
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
18051767
check_metric_after_block!(
18061768
th.net().metrics.tap_read_fails,
@@ -1809,59 +1771,6 @@ pub mod tests {
18091771
);
18101772
}
18111773

1812-
#[test]
1813-
fn test_deferred_frame() {
1814-
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
1815-
let mut th = TestHelper::get_default(&mem);
1816-
th.activate_net();
1817-
1818-
let rx_packets_count = th.net().metrics.rx_packets_count.count();
1819-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1820-
// Trigger a Tap event that. This should fail since there
1821-
// are not any available descriptors in the queue
1822-
check_metric_after_block!(
1823-
th.net().metrics.no_rx_avail_buffer,
1824-
1,
1825-
th.simulate_event(NetEvent::Tap)
1826-
);
1827-
// The frame we read from the tap should be deferred now and
1828-
// no frames should have been transmitted
1829-
assert!(th.net().rx_deferred_frame);
1830-
assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count);
1831-
1832-
// Let's add a second frame, which should really have the same
1833-
// fate.
1834-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1835-
1836-
// Adding a descriptor in the queue. This should handle the first deferred
1837-
// frame. However, this should try to handle the second tap as well and fail
1838-
// since there's only one Descriptor Chain in the queue.
1839-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1840-
check_metric_after_block!(
1841-
th.net().metrics.no_rx_avail_buffer,
1842-
1,
1843-
th.simulate_event(NetEvent::Tap)
1844-
);
1845-
// We should still have a deferred frame
1846-
assert!(th.net().rx_deferred_frame);
1847-
// However, we should have delivered the first frame
1848-
assert_eq!(
1849-
th.net().metrics.rx_packets_count.count(),
1850-
rx_packets_count + 1
1851-
);
1852-
1853-
// Let's add one more descriptor and try to handle the last frame as well.
1854-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1855-
check_metric_after_block!(
1856-
th.net().metrics.rx_packets_count,
1857-
1,
1858-
th.simulate_event(NetEvent::RxQueue)
1859-
);
1860-
1861-
// We should be done with any deferred frame
1862-
assert!(!th.net().rx_deferred_frame);
1863-
}
1864-
18651774
#[test]
18661775
fn test_rx_rate_limiter_handling() {
18671776
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
@@ -1974,7 +1883,7 @@ pub mod tests {
19741883
let mut rl = RateLimiter::new(1000, 0, 500, 0, 0, 0).unwrap();
19751884

19761885
// set up RX
1977-
assert!(!th.net().rx_deferred_frame);
1886+
assert!(th.net().deferred_rx_bytes.is_none());
19781887
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
19791888

19801889
let frame = inject_tap_tx_frame(&th.net(), 1000);
@@ -1994,7 +1903,7 @@ pub mod tests {
19941903
// assert that limiter is blocked
19951904
assert!(th.net().rx_rate_limiter.is_blocked());
19961905
assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1);
1997-
assert!(th.net().rx_deferred_frame);
1906+
assert!(th.net().deferred_rx_bytes.is_some());
19981907
// assert that no operation actually completed (limiter blocked it)
19991908
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
20001909
// make sure the data is still queued for processing
@@ -2092,7 +2001,7 @@ pub mod tests {
20922001
let mut rl = RateLimiter::new(0, 0, 0, 1, 0, 500).unwrap();
20932002

20942003
// set up RX
2095-
assert!(!th.net().rx_deferred_frame);
2004+
assert!(th.net().deferred_rx_bytes.is_none());
20962005
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
20972006
let frame = inject_tap_tx_frame(&th.net(), 1234);
20982007

@@ -2114,7 +2023,7 @@ pub mod tests {
21142023
// assert that limiter is blocked
21152024
assert!(th.net().rx_rate_limiter.is_blocked());
21162025
assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1);
2117-
assert!(th.net().rx_deferred_frame);
2026+
assert!(th.net().deferred_rx_bytes.is_some());
21182027
// assert that no operation actually completed (limiter blocked it)
21192028
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
21202029
// make sure the data is still queued for processing

0 commit comments

Comments
 (0)