@@ -4,12 +4,14 @@ use std::{future::pending, sync::atomic::Ordering, time::Duration};
4
4
use std:: thread:: ThreadId ;
5
5
6
6
use event_worker:: events:: ShutdownReason ;
7
- use log:: error;
7
+ use log:: { error, info } ;
8
8
use sb_workers:: context:: { Timing , TimingStatus , UserWorkerMsgs } ;
9
+ use tokio_util:: sync:: CancellationToken ;
9
10
10
11
use crate :: rt_worker:: supervisor:: {
11
- create_wall_clock_beforeunload_alert, v8_handle_early_retire,
12
- v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage , Tokens ,
12
+ create_wall_clock_beforeunload_alert, v8_handle_early_drop_beforeunload,
13
+ v8_handle_early_retire, v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage , Tokens ,
14
+ V8HandleEarlyRetireData ,
13
15
} ;
14
16
15
17
use super :: { v8_handle_termination, Arguments , CPUUsageMetrics , V8HandleTerminationData } ;
@@ -57,12 +59,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
57
59
let mut is_worker_entered = false ;
58
60
let mut is_wall_clock_beforeunload_armed = false ;
59
61
let mut is_cpu_time_soft_limit_reached = false ;
60
- let mut is_termination_requested = false ;
62
+ let mut is_waiting_for_termination = false ;
61
63
let mut have_all_reqs_been_acknowledged = false ;
62
64
63
65
let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx. unwrap ( ) ;
64
66
let mut cpu_usage_ms = 0i64 ;
65
67
68
+ let mut complete_reason = None :: < ShutdownReason > ;
66
69
let mut wall_clock_alerts = 0 ;
67
70
let mut req_ack_count = 0usize ;
68
71
@@ -97,6 +100,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
97
100
guard. raise ( ) ;
98
101
} ;
99
102
103
+ let early_drop_token = CancellationToken :: new ( ) ;
104
+ let early_drop_fut = early_drop_token. cancelled ( ) ;
105
+
106
+ let mut dispatch_early_drop_beforeunload_fn = Some ( {
107
+ let token = early_drop_token. clone ( ) ;
108
+ || {
109
+ let data_ptr_mut = Box :: into_raw ( Box :: new ( V8HandleEarlyRetireData { token } ) ) ;
110
+
111
+ if !thread_safe_handle. request_interrupt (
112
+ v8_handle_early_drop_beforeunload,
113
+ data_ptr_mut as * mut std:: ffi:: c_void ,
114
+ ) {
115
+ drop ( unsafe { Box :: from_raw ( data_ptr_mut) } ) ;
116
+ } else {
117
+ waker. wake ( ) ;
118
+ }
119
+ }
120
+ } ) ;
121
+
100
122
let terminate_fn = {
101
123
let thread_safe_handle = thread_safe_handle. clone ( ) ;
102
124
move || {
@@ -115,23 +137,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
115
137
116
138
tokio:: pin!( wall_clock_duration_alert) ;
117
139
tokio:: pin!( wall_clock_beforeunload_alert) ;
140
+ tokio:: pin!( early_drop_fut) ;
118
141
119
- let result = ' scope : loop {
142
+ loop {
120
143
tokio:: select! {
121
144
_ = supervise. cancelled( ) => {
122
- break ' scope ( ShutdownReason :: TerminationRequested , cpu_usage_ms ) ;
145
+ complete_reason = Some ( ShutdownReason :: TerminationRequested ) ;
123
146
}
124
147
125
148
_ = async {
126
149
match termination. as_ref( ) {
127
150
Some ( token) => token. inbound. cancelled( ) . await ,
128
151
None => pending( ) . await ,
129
152
}
130
- } , if !is_termination_requested => {
131
- is_termination_requested = true ;
153
+ } , if !is_waiting_for_termination => {
154
+ is_waiting_for_termination = true ;
132
155
if promise_metrics. have_all_promises_been_resolved( ) {
133
- terminate_fn( ) ;
134
- break ' scope ( ShutdownReason :: TerminationRequested , cpu_usage_ms) ;
156
+ if let Some ( func) = dispatch_early_drop_beforeunload_fn. take( ) {
157
+ func( ) ;
158
+ }
135
159
}
136
160
}
137
161
@@ -164,9 +188,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
164
188
165
189
if !cpu_timer_param. is_disabled( ) {
166
190
if cpu_usage_ms >= hard_limit_ms as i64 {
167
- terminate_fn( ) ;
168
191
error!( "CPU time hard limit reached: isolate: {:?}" , key) ;
169
- break ' scope ( ShutdownReason :: CPUTime , cpu_usage_ms ) ;
192
+ complete_reason = Some ( ShutdownReason :: CPUTime ) ;
170
193
} else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached {
171
194
early_retire_fn( ) ;
172
195
error!( "CPU time soft limit reached: isolate: {:?}" , key) ;
@@ -177,17 +200,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
177
200
if have_all_reqs_been_acknowledged
178
201
&& promise_metrics. have_all_promises_been_resolved( )
179
202
{
180
- terminate_fn ( ) ;
181
- error! ( "early termination due to the last request being completed: isolate: {:?}" , key ) ;
182
- break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms ) ;
203
+ if let Some ( func ) = dispatch_early_drop_beforeunload_fn . take ( ) {
204
+ func ( ) ;
205
+ }
183
206
}
184
207
185
208
} else if is_cpu_time_soft_limit_reached
186
209
&& have_all_reqs_been_acknowledged
187
210
&& promise_metrics. have_all_promises_been_resolved( )
188
211
{
189
- terminate_fn( ) ;
190
- break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
212
+ if let Some ( func) = dispatch_early_drop_beforeunload_fn. take( ) {
213
+ func( ) ;
214
+ }
191
215
}
192
216
}
193
217
}
@@ -206,14 +230,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
206
230
if have_all_reqs_been_acknowledged
207
231
&& promise_metrics. have_all_promises_been_resolved( )
208
232
{
209
- terminate_fn ( ) ;
210
- error! ( "early termination due to the last request being completed: isolate: {:?}" , key ) ;
211
- break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms ) ;
233
+ if let Some ( func ) = dispatch_early_drop_beforeunload_fn . take ( ) {
234
+ func ( ) ;
235
+ }
212
236
}
213
237
} else {
214
- terminate_fn( ) ;
215
238
error!( "CPU time hard limit reached: isolate: {:?}" , key) ;
216
- break ' scope ( ShutdownReason :: CPUTime , cpu_usage_ms ) ;
239
+ complete_reason = Some ( ShutdownReason :: CPUTime ) ;
217
240
}
218
241
}
219
242
}
@@ -237,9 +260,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
237
260
continue ;
238
261
}
239
262
240
- terminate_fn ( ) ;
241
- error! ( "early termination due to the last request being completed: isolate: {:?}" , key ) ;
242
- break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms ) ;
263
+ if let Some ( func ) = dispatch_early_drop_beforeunload_fn . take ( ) {
264
+ func ( ) ;
265
+ }
243
266
}
244
267
245
268
_ = wall_clock_duration_alert. tick( ) , if !is_wall_clock_limit_disabled => {
@@ -253,9 +276,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
253
276
} else {
254
277
let is_in_flight_req_exists = req_ack_count != demand. load( Ordering :: Acquire ) ;
255
278
256
- terminate_fn( ) ;
257
279
error!( "wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})" , key, is_in_flight_req_exists) ;
258
- break ' scope ( ShutdownReason :: WallClockTime , cpu_usage_ms ) ;
280
+ complete_reason = Some ( ShutdownReason :: WallClockTime ) ;
259
281
}
260
282
}
261
283
@@ -273,18 +295,34 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
273
295
}
274
296
275
297
Some ( _) = memory_limit_rx. recv( ) => {
276
- terminate_fn( ) ;
277
298
error!( "memory limit reached for the worker: isolate: {:?}" , key) ;
278
- break ' scope ( ShutdownReason :: Memory , cpu_usage_ms ) ;
299
+ complete_reason = Some ( ShutdownReason :: Memory ) ;
279
300
}
280
- }
281
- } ;
282
301
283
- match result {
284
- ( ShutdownReason :: EarlyDrop , cpu_usage_ms) if is_termination_requested => {
285
- ( ShutdownReason :: TerminationRequested , cpu_usage_ms)
302
+ _ = & mut early_drop_fut => {
303
+ info!( "early termination has been triggered: isolate: {:?}" , key) ;
304
+ complete_reason = Some ( ShutdownReason :: EarlyDrop ) ;
305
+ }
286
306
}
287
307
288
- result => result,
308
+ match complete_reason. take ( ) {
309
+ Some ( ShutdownReason :: EarlyDrop ) => {
310
+ terminate_fn ( ) ;
311
+ return (
312
+ if is_waiting_for_termination {
313
+ ShutdownReason :: TerminationRequested
314
+ } else {
315
+ ShutdownReason :: EarlyDrop
316
+ } ,
317
+ cpu_usage_ms,
318
+ ) ;
319
+ }
320
+
321
+ Some ( result) => {
322
+ terminate_fn ( ) ;
323
+ return ( result, cpu_usage_ms) ;
324
+ }
325
+ None => continue ,
326
+ }
289
327
}
290
328
}
0 commit comments