Skip to content

Commit f40fa69

Browse files
committed
worker: add postMessageToWorker
1 parent 8f71a1b commit f40fa69

File tree

11 files changed

+550
-5
lines changed

11 files changed

+550
-5
lines changed

doc/api/errors.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3085,6 +3085,16 @@ The `Worker` initialization failed.
30853085
The `execArgv` option passed to the `Worker` constructor contains
30863086
invalid flags.
30873087

3088+
<a id="ERR_WORKER_INVALID_ID"></a>
3089+
3090+
### `ERR_WORKER_INVALID_ID`
3091+
3092+
<!-- YAML
3093+
added: REPLACEME
3094+
-->
3095+
3096+
The thread ID requested in [`postMessageToWorker()`][] is invalid or has no `workerMessage` listener.
3097+
30883098
<a id="ERR_WORKER_NOT_RUNNING"></a>
30893099

30903100
### `ERR_WORKER_NOT_RUNNING`
@@ -3104,6 +3114,16 @@ The `Worker` instance terminated because it reached its memory limit.
31043114
The path for the main script of a worker is neither an absolute path
31053115
nor a relative path starting with `./` or `../`.
31063116

3117+
<a id="ERR_WORKER_SAME_THREAD"></a>
3118+
3119+
### `ERR_WORKER_SAME_THREAD`
3120+
3121+
<!-- YAML
3122+
added: REPLACEME
3123+
-->
3124+
3125+
The thread id requested in [`postMessageToWorker()`][] is the current thread id.
3126+
31073127
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
31083128

31093129
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -4027,6 +4047,7 @@ An error occurred trying to allocate memory. This should never happen.
40274047
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
40284048
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
40294049
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
4050+
[`postMessageToWorker()`]: worker_threads.md#workerpostmessagetoworkerdestination-value-transferlist-timeout
40304051
[`process.on('exit')`]: process.md#event-exit
40314052
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
40324053
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn

doc/api/process.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
327327
likely best for long-running application) or upon process exit (which is likely
328328
most convenient for scripts).
329329

330+
### Event: `'workerMessage'`
331+
332+
<!-- YAML
333+
added: REPLACEME
334+
-->
335+
336+
* `value` {any} A value transmitted using [`postMessageToWorker()`][].
337+
* `source` {number} The transmitting worker thread ID or `0` for the main thread.
338+
339+
The `'workerMessage'` event is emitted for any incoming message send by the other
340+
party by using [`postMessageToWorker()`][].
341+
330342
### Event: `'uncaughtException'`
331343

332344
<!-- YAML
@@ -4073,6 +4085,7 @@ cases:
40734085
[`net.Server`]: net.md#class-netserver
40744086
[`net.Socket`]: net.md#class-netsocket
40754087
[`os.constants.dlopen`]: os.md#dlopen-constants
4088+
[`postMessageToWorker()`]: worker_threads.md#workerpostmessagetoworkerdestination-value-transferlist-timeout
40764089
[`process.argv`]: #processargv
40774090
[`process.config`]: #processconfig
40784091
[`process.execPath`]: #processexecpath

doc/api/worker_threads.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,111 @@ if (isMainThread) {
252252
}
253253
```
254254

255+
## `worker.postMessageToWorker(destination, value[, transferList][, timeout])`
256+
257+
<!-- YAML
258+
added: REPLACEME
259+
-->
260+
261+
> Stability: 1.1 - Active development
262+
263+
* `destination` {number} The target thread ID.
264+
* `value` {any} The value to send.
265+
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
266+
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
267+
See [`port.postMessage()`][] for more information.
268+
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
269+
By default it's `undefined`, which means wait forever.
270+
* Returns: {Promise} A promise which is fulfilled if the message was successfully sent.
271+
272+
Sends a value to another worker, identified by its thread ID.
273+
274+
If the target thread has no listener for the `workerMessage` event, then the operation will throw an error.
275+
276+
This method should be used when the target thread is not the direct
277+
parent or child of the current thread.
278+
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
279+
and the [`worker.postMessage()`][] to let the threads communicate.
280+
281+
The example below shows the use of of `postMessageToWorker`: it creates 10 nested threads,
282+
the last one will try to communicate with the main thread.
283+
284+
```mjs
285+
import { fileURLToPath } from 'node:url';
286+
import { once } from 'node:events';
287+
import process from 'node:process';
288+
import {
289+
isMainThread,
290+
postMessageToWorker,
291+
threadId,
292+
workerData,
293+
Worker,
294+
} from 'node:worker_threads';
295+
296+
const channel = new BroadcastChannel('sync');
297+
const level = workerData?.level ?? 0;
298+
299+
if (level < 10) {
300+
const worker = new Worker(fileURLToPath(import.meta.url), {
301+
workerData: { level: level + 1 },
302+
});
303+
}
304+
305+
if (level === 0) {
306+
process.on('workerMessage', (value, source) => {
307+
console.log(`${source} -> ${threadId}:`, value);
308+
postMessageToWorker(source, { message: 'pong' });
309+
});
310+
} else if (level === 10) {
311+
process.on('workerMessage', (value, source) => {
312+
console.log(`${source} -> ${threadId}:`, value);
313+
channel.postMessage('done');
314+
channel.close();
315+
});
316+
317+
await postMessageToWorker(0, { message: 'ping' });
318+
}
319+
320+
channel.onmessage = channel.close;
321+
```
322+
323+
```cjs
324+
const { once } = require('node:events');
325+
const {
326+
isMainThread,
327+
postMessageToWorker,
328+
threadId,
329+
workerData,
330+
Worker,
331+
} = require('node:worker_threads');
332+
333+
const channel = new BroadcastChannel('sync');
334+
const level = workerData?.level ?? 0;
335+
336+
if (level < 10) {
337+
const worker = new Worker(__filename, {
338+
workerData: { level: level + 1 },
339+
});
340+
}
341+
342+
if (level === 0) {
343+
process.on('workerMessage', (value, source) => {
344+
console.log(`${source} -> ${threadId}:`, value);
345+
postMessageToWorker(source, { message: 'pong' });
346+
});
347+
} else if (level === 10) {
348+
process.on('workerMessage', (value, source) => {
349+
console.log(`${source} -> ${threadId}:`, value);
350+
channel.postMessage('done');
351+
channel.close();
352+
});
353+
354+
postMessageToWorker(0, { message: 'ping' });
355+
}
356+
357+
channel.onmessage = channel.close;
358+
```
359+
255360
## `worker.receiveMessageOnPort(port)`
256361
257362
<!-- YAML

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,6 +1862,7 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
18621862
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
18631863
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
18641864
Error);
1865+
E('ERR_WORKER_INVALID_ID', 'Invalid worker id %d', Error);
18651866
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
18661867
E('ERR_WORKER_OUT_OF_MEMORY',
18671868
'Worker terminated due to reaching memory limit: %s', Error);
@@ -1876,6 +1877,7 @@ E('ERR_WORKER_PATH', (filename) =>
18761877
) +
18771878
` Received "${filename}"`,
18781879
TypeError);
1880+
E('ERR_WORKER_SAME_THREAD', 'Cannot connect to the same thread', Error);
18791881
E('ERR_WORKER_UNSERIALIZABLE_ERROR',
18801882
'Serializing an uncaught exception failed', Error);
18811883
E('ERR_WORKER_UNSUPPORTED_OPERATION',

lib/internal/main/worker_thread.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const {
4444
kStdioWantsMoreDataCallback,
4545
} = workerIo;
4646

47+
const { setupMainThreadPort } = require('internal/worker/messaging');
48+
4749
const {
4850
onGlobalUncaughtException,
4951
} = require('internal/process/execution');
@@ -96,6 +98,7 @@ port.on('message', (message) => {
9698
hasStdin,
9799
publicPort,
98100
workerData,
101+
mainThreadPort,
99102
} = message;
100103

101104
if (doEval !== 'internal') {
@@ -109,6 +112,7 @@ port.on('message', (message) => {
109112
}
110113

111114
require('internal/worker').assignEnvironmentData(environmentData);
115+
setupMainThreadPort(mainThreadPort);
112116

113117
if (SharedArrayBuffer !== undefined) {
114118
// The counter is only passed to the workers created by the main thread,

lib/internal/worker.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const {
5656
ReadableWorkerStdio,
5757
WritableWorkerStdio,
5858
} = workerIo;
59+
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
5960
const { deserializeError } = require('internal/error_serdes');
6061
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6162
const { kEmptyObject } = require('internal/util');
@@ -250,14 +251,18 @@ class Worker extends EventEmitter {
250251

251252
this[kParentSideStdio] = { stdin, stdout, stderr };
252253

253-
const { port1, port2 } = new MessageChannel();
254-
const transferList = [port2];
254+
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
255+
const {
256+
port1: publicPortToParent,
257+
port2: publicPortToWorker,
258+
} = new MessageChannel();
259+
const transferList = [mainThreadPortToWorker, publicPortToWorker];
255260
// If transferList is provided.
256261
if (options.transferList)
257262
ArrayPrototypePush(transferList,
258263
...new SafeArrayIterator(options.transferList));
259264

260-
this[kPublicPort] = port1;
265+
this[kPublicPort] = publicPortToParent;
261266
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
262267
this[kPublicPort].on(event, (message) => this.emit(event, message));
263268
});
@@ -271,8 +276,9 @@ class Worker extends EventEmitter {
271276
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
272277
workerData: options.workerData,
273278
environmentData,
274-
publicPort: port2,
275279
hasStdin: !!options.stdin,
280+
publicPort: publicPortToWorker,
281+
mainThreadPort: mainThreadPortToWorker,
276282
}, transferList);
277283
// Use this to cache the Worker's loopStart value once available.
278284
this[kLoopStartTime] = -1;
@@ -295,6 +301,7 @@ class Worker extends EventEmitter {
295301
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
296302
drainMessagePort(this[kPublicPort]);
297303
drainMessagePort(this[kPort]);
304+
destroyMainThreadPort(this.threadId);
298305
this.removeAllListeners('message');
299306
this.removeAllListeners('messageerrors');
300307
this[kPublicPort].unref();

0 commit comments

Comments
 (0)