Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/base/src/worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;

use base_rt::RuntimeState;
use deno_core::unsync::sync::AtomicFlag;
Expand Down Expand Up @@ -37,6 +38,8 @@ use super::V8HandleTerminationData;

#[derive(Debug, Default)]
struct State {
req_absent_duration: Option<Duration>,

is_worker_entered: bool,
is_wall_clock_limit_disabled: bool,
is_wall_clock_beforeunload_armed: bool,
Expand All @@ -49,6 +52,7 @@ struct State {
wall_clock_alerts: usize,

req_ack_count: usize,
last_req_ack: Option<SystemTime>,
req_demand: Arc<AtomicUsize>,

runtime: Arc<RuntimeState>,
Expand Down Expand Up @@ -80,6 +84,7 @@ impl State {

fn req_acknowledged(&mut self) {
self.req_ack_count += 1;
self.last_req_ack = Some(SystemTime::now());
self.update_runtime_state();
}

Expand All @@ -92,6 +97,14 @@ impl State {
|| self.is_cpu_time_soft_limit_reached
|| self.is_mem_half_reached
|| self.wall_clock_alerts == 2
|| matches!(
self
.last_req_ack
.as_ref()
.zip(self.req_absent_duration)
.and_then(|(t, d)| t.checked_add(d)),
Some(t) if t < SystemTime::now()
)
}

fn have_all_reqs_been_acknowledged(&self) -> bool {
Expand Down Expand Up @@ -143,6 +156,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

let mut complete_reason = None::<ShutdownReason>;
let mut state = State {
req_absent_duration: runtime_opts
.context
.as_ref()
.and_then(|it| it.get("supervisor"))
.and_then(|it| {
it.get("requestAbsentTimeoutMs")
.and_then(|it| it.as_u64())
.map(Duration::from_millis)
}),

is_wall_clock_limit_disabled: worker_timeout_ms == 0,
is_cpu_time_limit_disabled: cpu_time_soft_limit_ms == 0
&& cpu_time_hard_limit_ms == 0,
Expand Down
11 changes: 9 additions & 2 deletions crates/base/test_cases/main/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
console.log("main function started");

function parseIntFromHeadersOrDefault(req: Request, key: string, val: number) {
function parseIntFromHeadersOrDefault(req: Request, key: string, val?: number) {
const headerValue = req.headers.get(key);
if (!headerValue) {
return val;
Expand Down Expand Up @@ -62,7 +62,14 @@ Deno.serve((req: Request) => {
const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]);
const context = {
sourceMap: req.headers.get("x-context-source-map") == "true",
useReadSyncFileAPI: req.headers.get("x-use-read-sync-file-api") == "true",
useReadSyncFileAPI:
req.headers.get("x-context-use-read-sync-file-api") == "true",
supervisor: {
requestAbsentTimeoutMs: parseIntFromHeadersOrDefault(
req,
"x-context-supervisor-request-absent-timeout-ms",
),
},
};

return await EdgeRuntime.userWorkers.create({
Expand Down
48 changes: 47 additions & 1 deletion crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,10 @@ async fn test_issue_func_205() {
b.uri("/issue-func-205")
.header("x-cpu-time-soft-limit-ms", HeaderValue::from_static("500"))
.header("x-cpu-time-hard-limit-ms", HeaderValue::from_static("1000"))
.header("x-use-read-sync-file-api", HeaderValue::from_static("true"))
.header(
"x-context-use-read-sync-file-api",
HeaderValue::from_static("true"),
)
.body(Body::empty())
.context("can't make request")
})
Expand Down Expand Up @@ -3865,6 +3868,49 @@ async fn test_eszip_wasm_import() {
);
}

#[tokio::test]
#[serial]
async fn test_request_absent_timeout() {
let (tx, mut rx) = mpsc::unbounded_channel();
let tb = TestBedBuilder::new("./test_cases/main")
.with_per_worker_policy(None)
.with_worker_event_sender(Some(tx))
.build()
.await;

let resp = tb
.request(|b| {
b.uri("/sleep-5000ms")
.header("x-worker-timeout-ms", HeaderValue::from_static("3600000"))
.header(
"x-context-supervisor-request-absent-timeout-ms",
HeaderValue::from_static("1000"),
)
.body(Body::empty())
.context("can't make request")
})
.await
.unwrap();

assert_eq!(resp.status().as_u16(), StatusCode::OK);

sleep(Duration::from_secs(3)).await;
rx.close();
tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await;

while let Some(ev) = rx.recv().await {
let WorkerEvents::Shutdown(ev) = ev.event else {
continue;
};
if ev.reason != ShutdownReason::EarlyDrop {
break;
}
return;
}

unreachable!("test failed");
}

#[derive(Deserialize)]
struct ErrorResponsePayload {
msg: String,
Expand Down
3 changes: 3 additions & 0 deletions types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ interface UserWorkerCreateContext {
shouldBootstrapMockFnThrowError?: boolean | null;
suppressEszipMigrationWarning?: boolean | null;
useReadSyncFileAPI?: boolean | null;
supervisor?: {
requestAbsentTimeoutMs?: number | null;
};
}

interface UserWorkerCreateOptions {
Expand Down