Skip to content

Commit bd68fbd

Browse files
theanarkhtargos
authored andcommitted
worker: add cpuUsage for worker
PR-URL: #59177 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Ilyas Shabi <[email protected]>
1 parent 4a907bd commit bd68fbd

File tree

9 files changed

+225
-1
lines changed

9 files changed

+225
-1
lines changed

doc/api/worker_threads.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,6 +1758,19 @@ added: v10.5.0
17581758
The `'online'` event is emitted when the worker thread has started executing
17591759
JavaScript code.
17601760

1761+
### `worker.cpuUsage([prev])`
1762+
1763+
<!-- YAML
1764+
added:
1765+
- REPLACEME
1766+
-->
1767+
1768+
* Returns: {Promise}
1769+
1770+
This method returns a `Promise` that will resolve to an object identical to [`process.threadCpuUsage()`][],
1771+
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
1772+
This methods allows the statistics to be observed from outside the actual thread.
1773+
17611774
### `worker.getHeapSnapshot([options])`
17621775

17631776
<!-- YAML
@@ -2119,6 +2132,7 @@ thread spawned will spawn another until the application crashes.
21192132
[`process.stderr`]: process.md#processstderr
21202133
[`process.stdin`]: process.md#processstdin
21212134
[`process.stdout`]: process.md#processstdout
2135+
[`process.threadCpuUsage()`]: process.md#processthreadcpuusagepreviousvalue
21222136
[`process.title`]: process.md#processtitle
21232137
[`require('node:worker_threads').isMainThread`]: #workerismainthread
21242138
[`require('node:worker_threads').parentPort.on('message')`]: #event-message

lib/internal/worker.js

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
Float64Array,
99
FunctionPrototypeBind,
1010
MathMax,
11+
NumberMAX_SAFE_INTEGER,
1112
ObjectEntries,
1213
Promise,
1314
PromiseResolve,
@@ -40,6 +41,7 @@ const {
4041
ERR_WORKER_INVALID_EXEC_ARGV,
4142
ERR_INVALID_ARG_TYPE,
4243
ERR_INVALID_ARG_VALUE,
44+
ERR_OPERATION_FAILED,
4345
} = errorCodes;
4446

4547
const workerIo = require('internal/worker/io');
@@ -60,7 +62,7 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
6062
const { deserializeError } = require('internal/error_serdes');
6163
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6264
const { kEmptyObject } = require('internal/util');
63-
const { validateArray, validateString } = require('internal/validators');
65+
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
6466
const {
6567
throwIfBuildingSnapshot,
6668
} = require('internal/v8/startup_snapshot');
@@ -474,6 +476,37 @@ class Worker extends EventEmitter {
474476
};
475477
});
476478
}
479+
480+
cpuUsage(prev) {
481+
if (prev) {
482+
validateObject(prev, 'prev');
483+
validateNumber(prev.user, 'prev.user', 0, NumberMAX_SAFE_INTEGER);
484+
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
485+
}
486+
if (process.platform === 'sunos') {
487+
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
488+
}
489+
const taker = this[kHandle]?.cpuUsage();
490+
return new Promise((resolve, reject) => {
491+
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
492+
taker.ondone = (err, current) => {
493+
if (err !== null) {
494+
return reject(err);
495+
}
496+
if (prev) {
497+
resolve({
498+
user: current.user - prev.user,
499+
system: current.system - prev.system,
500+
});
501+
} else {
502+
resolve({
503+
user: current.user,
504+
system: current.system,
505+
});
506+
}
507+
};
508+
});
509+
}
477510
}
478511

479512
/**

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ namespace node {
7979
V(UDPWRAP) \
8080
V(SIGINTWATCHDOG) \
8181
V(WORKER) \
82+
V(WORKERCPUUSAGE) \
8283
V(WORKERHEAPSNAPSHOT) \
8384
V(WORKERHEAPSTATISTICS) \
8485
V(WRITEWRAP) \

src/env_properties.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@
475475
V(tcp_constructor_template, v8::FunctionTemplate) \
476476
V(tty_constructor_template, v8::FunctionTemplate) \
477477
V(write_wrap_template, v8::ObjectTemplate) \
478+
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
478479
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
479480
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
480481
V(x509_constructor_template, v8::FunctionTemplate)

src/node_worker.cc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ using v8::Isolate;
3232
using v8::Local;
3333
using v8::Locker;
3434
using v8::Maybe;
35+
using v8::Name;
3536
using v8::Null;
3637
using v8::Number;
3738
using v8::Object;
@@ -810,6 +811,81 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
810811
}
811812
}
812813

814+
class WorkerCpuUsageTaker : public AsyncWrap {
815+
public:
816+
WorkerCpuUsageTaker(Environment* env, Local<Object> obj)
817+
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERCPUUSAGE) {}
818+
819+
SET_NO_MEMORY_INFO()
820+
SET_MEMORY_INFO_NAME(WorkerCpuUsageTaker)
821+
SET_SELF_SIZE(WorkerCpuUsageTaker)
822+
};
823+
824+
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
825+
Worker* w;
826+
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
827+
828+
Environment* env = w->env();
829+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
830+
Local<Object> wrap;
831+
if (!env->worker_cpu_usage_taker_template()
832+
->NewInstance(env->context())
833+
.ToLocal(&wrap)) {
834+
return;
835+
}
836+
837+
BaseObjectPtr<WorkerCpuUsageTaker> taker =
838+
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
839+
840+
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
841+
env](Environment* worker_env) mutable {
842+
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
843+
int err = uv_getrusage_thread(cpu_usage_stats.get());
844+
845+
env->SetImmediateThreadsafe(
846+
[taker = std::move(taker),
847+
cpu_usage_stats = std::move(cpu_usage_stats),
848+
err = err](Environment* env) mutable {
849+
Isolate* isolate = env->isolate();
850+
HandleScope handle_scope(isolate);
851+
Context::Scope context_scope(env->context());
852+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
853+
854+
Local<Value> argv[] = {
855+
Null(isolate),
856+
Undefined(isolate),
857+
};
858+
859+
if (err) {
860+
argv[0] = UVException(
861+
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
862+
} else {
863+
Local<Name> names[] = {
864+
FIXED_ONE_BYTE_STRING(isolate, "user"),
865+
FIXED_ONE_BYTE_STRING(isolate, "system"),
866+
};
867+
Local<Value> values[] = {
868+
Number::New(isolate,
869+
1e6 * cpu_usage_stats->ru_utime.tv_sec +
870+
cpu_usage_stats->ru_utime.tv_usec),
871+
Number::New(isolate,
872+
1e6 * cpu_usage_stats->ru_stime.tv_sec +
873+
cpu_usage_stats->ru_stime.tv_usec),
874+
};
875+
argv[1] = Object::New(
876+
isolate, Null(isolate), names, values, arraysize(names));
877+
}
878+
879+
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
880+
},
881+
CallbackFlags::kUnrefed);
882+
});
883+
884+
if (scheduled) {
885+
args.GetReturnValue().Set(wrap);
886+
}
887+
}
888+
813889
class WorkerHeapStatisticsTaker : public AsyncWrap {
814890
public:
815891
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
@@ -1101,6 +1177,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11011177
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
11021178
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
11031179
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
1180+
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);
11041181

11051182
SetConstructorFunction(isolate, target, "Worker", w);
11061183
}
@@ -1133,6 +1210,19 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11331210
wst->InstanceTemplate());
11341211
}
11351212

1213+
{
1214+
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
1215+
1216+
wst->InstanceTemplate()->SetInternalFieldCount(
1217+
WorkerCpuUsageTaker::kInternalFieldCount);
1218+
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
1219+
1220+
Local<String> wst_string =
1221+
FIXED_ONE_BYTE_STRING(isolate, "WorkerCpuUsageTaker");
1222+
wst->SetClassName(wst_string);
1223+
isolate_data->set_worker_cpu_usage_taker_template(wst->InstanceTemplate());
1224+
}
1225+
11361226
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
11371227
}
11381228

@@ -1199,6 +1289,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
11991289
registry->Register(Worker::LoopIdleTime);
12001290
registry->Register(Worker::LoopStartTime);
12011291
registry->Register(Worker::GetHeapStatistics);
1292+
registry->Register(Worker::CpuUsage);
12021293
}
12031294

12041295
} // anonymous namespace

src/node_worker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class Worker : public AsyncWrap {
8080
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
8181
static void GetHeapStatistics(
8282
const v8::FunctionCallbackInfo<v8::Value>& args);
83+
static void CpuUsage(const v8::FunctionCallbackInfo<v8::Value>& args);
8384

8485
private:
8586
bool CreateEnvMessagePort(Environment* env);
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { isSunOS } = require('../common');
4+
const assert = require('assert');
5+
const {
6+
Worker,
7+
} = require('worker_threads');
8+
9+
function validate(result) {
10+
assert.ok(typeof result == 'object' && result !== null);
11+
assert.ok(result.user >= 0);
12+
assert.ok(result.system >= 0);
13+
assert.ok(Number.isFinite(result.user));
14+
assert.ok(Number.isFinite(result.system));
15+
}
16+
17+
function check(worker) {
18+
[
19+
-1,
20+
1.1,
21+
NaN,
22+
undefined,
23+
{},
24+
[],
25+
null,
26+
function() {},
27+
Symbol(),
28+
true,
29+
Infinity,
30+
{ user: -1, system: 1 },
31+
{ user: 1, system: -1 },
32+
].forEach((value) => {
33+
try {
34+
worker.cpuUsage(value);
35+
} catch (e) {
36+
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
37+
}
38+
});
39+
}
40+
41+
const worker = new Worker(`
42+
const { parentPort } = require('worker_threads');
43+
parentPort.on('message', () => {});
44+
`, { eval: true });
45+
46+
// See test-process-threadCpuUsage-main-thread.js
47+
if (isSunOS) {
48+
assert.throws(
49+
() => worker.cpuUsage(),
50+
{
51+
code: 'ERR_OPERATION_FAILED',
52+
name: 'Error',
53+
message: 'Operation failed: worker.cpuUsage() is not available on SunOS'
54+
}
55+
);
56+
worker.terminate();
57+
} else {
58+
worker.on('online', common.mustCall(async () => {
59+
check(worker);
60+
61+
const prev = await worker.cpuUsage();
62+
validate(prev);
63+
64+
const curr = await worker.cpuUsage();
65+
validate(curr);
66+
67+
assert.ok(curr.user >= prev.user);
68+
assert.ok(curr.system >= prev.system);
69+
70+
const delta = await worker.cpuUsage(curr);
71+
validate(delta);
72+
73+
worker.terminate();
74+
}));
75+
76+
worker.once('exit', common.mustCall(async () => {
77+
await assert.rejects(worker.cpuUsage(), {
78+
code: 'ERR_WORKER_NOT_RUNNING'
79+
});
80+
}));
81+
}

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const { getSystemErrorName } = require('util');
6262
delete providers.SIGINTWATCHDOG;
6363
delete providers.WORKERHEAPSNAPSHOT;
6464
delete providers.WORKERHEAPSTATISTICS;
65+
delete providers.WORKERCPUUSAGE;
6566
delete providers.BLOBREADER;
6667
delete providers.RANDOMPRIMEREQUEST;
6768
delete providers.CHECKPRIMEREQUEST;

typings/internalBinding/worker.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ declare namespace InternalWorkerBinding {
1616
getResourceLimits(): Float64Array;
1717
takeHeapSnapshot(): object;
1818
getHeapStatistics(): Promise<object>;
19+
cpuUsage(): Promise<object>;
1920
loopIdleTime(): number;
2021
loopStartTime(): number;
2122
}

0 commit comments

Comments
 (0)