Skip to content

Improve logging in working threads #47923

Closed as not planned
Closed as not planned
@jimmywarting

Description

@jimmywarting

I basically had something in the lines of following:

// Create a shared buffer
const sharedBuff = new SharedArrayBuffer(1024)
const sharedInt32Array = new Int32Array(sharedBuff)

// Create a worker
const worker = createWorker(sharedBuff) 

// Initiate work
worker.postMessage('do_work')

// block main thread and wait for job to complete
atomic.wait(sharedInt32Array, a, b)

// print done
console.log('complete')

and the worker dose something like

onmessage = evt => {
  if (evt.data === 'do_work') {
    console.log('starting doing work')
    Atomic.notify(...)
  }
}

What this boils down to is:

  • you will get a console log that looks something like:
complete
starting doing work

which is in complete wrong order. cuz the main thread is blocked.
and the worker do the work and console log before the main thread can continue

it should have been

starting doing work
complete

My suspicion was the you send postMessages back to the main thread and then forward it to the main process.std

and i was right...

class WritableWorkerStdio extends Writable {
constructor(port, name) {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
}
_writev(chunks, cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunks: ArrayPrototypeMap(chunks,
({ chunk, encoding }) => ({ chunk, encoding })),
});
ArrayPrototypePush(this[kWritableCallbacks], cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
_final(cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunks: [ { chunk: null, encoding: '' } ],
});
cb();
}
[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
ArrayPrototypeForEach(cbs, (cb) => cb());
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
}
}

} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunks } = message;
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
process[stream].push(chunk, encoding);
});
} else {

This wasn't so good/effective
cuz it was hard to debug the worker and doing console logs while the main thread was blocked


But i manage to find a solution that worked out okey in the end
instead of using postMessages i just simple transfered the main process.stdout.fd and process.stderr.fd to the worker thread and wrote data directly to that stdio instead of sending postMessages back and forth (which is unnecessary and complicated)


So here is kind of my workaround:

import { workerData } from 'node:worker_threads'
import { writeSync } from 'node:fs'
import { Console } from 'node:console'
import { Writable } from 'node:stream'

const { main_stdout_fd, main_stderr_fd } = workerData

const stdio = fd => Writable.fromWeb(new WritableStream({
  write (chunk) {
    writeSync(stdout, chunk)
  }
}))

globalThis.console = new Console(stdio(main_stdout_fd), stdio(main_stderr_fd))

The logs will be in the correct order
and it dose not need any complicated postMessage protocol like

     this[kPort].postMessage({ 
       type: messageTypes.STDIO_PAYLOAD, 
       stream: this[kName], 

So my proposal is that you write directly to the main stdio file descriptors instead,

Metadata

Metadata

Assignees

No one assigned

    Labels

    consoleIssues and PRs related to the console subsystem.questionIssues that look for answers.workerIssues and PRs related to Worker support.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions