@@ -74,9 +74,12 @@ impl Io {
7474 let payload_len: usize = max_mtu. into ( ) ;
7575 let payload_len = payload_len as u32 ;
7676
77- let ( rx, rx_producer) = {
78- let entries = 1024 ;
77+ // This number is somewhat arbitrary but it's a decent number of messages without it consuming
78+ // large in memory. Eventually, it might be a good idea to expose this value in the
79+ // builder, but we'll wait until someone asks for it :).
80+ let entries = 1024 ;
7981
82+ let ( rx, rx_producer) = {
8083 let mut consumers = vec ! [ ] ;
8184
8285 let ( producer, consumer) = ring:: pair ( entries, payload_len) ;
@@ -88,31 +91,34 @@ impl Io {
8891 } ;
8992
9093 let ( tx, tx_consumer) = {
91- let entries = 1024 ;
92-
9394 let mut producers = vec ! [ ] ;
9495
9596 let ( producer, consumer) = ring:: pair ( entries, payload_len) ;
9697 producers. push ( producer) ;
9798
9899 let gso = crate :: features:: Gso :: default ( ) ;
100+
101+ // GSO is not supported by turmoil so disable it
99102 gso. disable ( ) ;
103+
100104 let tx = tx:: Tx :: new ( producers, gso, max_mtu) ;
101105
102106 ( tx, consumer)
103107 } ;
104108
109+ // Spawn a task that does the actual socket calls and coordinates with the event loop
110+ // through the ring buffers
105111 tokio:: spawn ( run_io ( socket, rx_producer, tx_consumer) ) ;
106112
107- let el = EventLoop {
113+ let event_loop = EventLoop {
108114 clock,
109115 rx,
110116 tx,
111117 endpoint,
112118 }
113119 . start ( ) ;
114120
115- Ok ( ( el , local_addr) )
121+ Ok ( ( event_loop , local_addr) )
116122 }
117123
118124 pub fn start < E : Endpoint < PathHandle = PathHandle > > (
@@ -153,19 +159,26 @@ async fn run_io(
153159
154160 loop {
155161 let socket_ready = socket. readable ( ) ;
156- let consumer_ready = poll_fn ( |cx| consumer. poll_acquire ( 1 , cx) ) ;
162+ let consumer_ready = poll_fn ( |cx| consumer. poll_acquire ( u32 :: MAX , cx) ) ;
157163 let producer_ready = async {
164+ // Only poll the producer if we need more capacity - otherwise we would constantly wake
165+ // up
158166 if poll_producer {
159- poll_fn ( |cx| producer. poll_acquire ( 1 , cx) ) . await
167+ poll_fn ( |cx| producer. poll_acquire ( u32 :: MAX , cx) ) . await
160168 } else {
161169 core:: future:: pending ( ) . await
162170 }
163171 } ;
172+ // The socket task doesn't have any application wakeups to handle so just make it pending
173+ let application_wakeup = core:: future:: pending ( ) ;
164174
175+ // We replace the timer future with the `socket_ready` instead, since we don't have a
176+ // timer here. Other than the application wakeup, Select doesn't really treat any of
177+ // the futures special.
165178 let is_readable = Select :: new (
166179 consumer_ready,
167180 producer_ready,
168- core :: future :: pending ( ) ,
181+ application_wakeup ,
169182 socket_ready,
170183 )
171184 . await
@@ -175,8 +188,11 @@ async fn run_io(
175188 if is_readable {
176189 let mut count = 0 ;
177190 for entry in producer. data ( ) {
191+ // Since UDP sockets are stateless, the only errors we should back is a WouldBlock.
192+ // If we get any errors, we'll try again later.
178193 if let Ok ( ( len, addr) ) = socket. try_recv_from ( entry. payload_mut ( ) ) {
179194 count += 1 ;
195+ // update the packet information
180196 entry. set_remote_address ( & ( addr. into ( ) ) ) ;
181197 unsafe {
182198 entry. set_payload_len ( len) ;
@@ -186,6 +202,7 @@ async fn run_io(
186202 }
187203 }
188204
205+ // release the received messages to the consumer
189206 producer. release ( count) ;
190207
191208 // only poll the producer if we need entries
@@ -198,16 +215,21 @@ async fn run_io(
198215 let addr = * entry. remote_address ( ) ;
199216 let addr: std:: net:: SocketAddr = addr. into ( ) ;
200217 let payload = entry. payload_mut ( ) ;
218+ // Since UDP sockets are stateless, the only errors we should back is a WouldBlock.
219+ // If we get any errors, we'll try again later.
201220 if socket. try_send_to ( payload, addr) . is_ok ( ) {
202221 count += 1 ;
203222 } else {
204223 break ;
205224 }
206225 }
226+
227+ // release capacity back to the producer
207228 consumer. release ( count) ;
208229 }
209230
210- if !( producer. is_open ( ) || consumer. is_open ( ) ) {
231+ // check to see if the rings are open, otherwise we need to shut down the task
232+ if !( producer. is_open ( ) && consumer. is_open ( ) ) {
211233 return Ok ( ( ) ) ;
212234 }
213235 }
0 commit comments