Skip to content
24 changes: 24 additions & 0 deletions lib/internal/modules/cjs/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,28 @@ function reportModuleNotFoundToWatchMode(basePath, extensions) {
}
}

/**
* Tell the watch mode that a module was required, from within a worker thread.
* @param {string} filename Absolute path of the module
* @returns {void}
*/
function reportModuleToWatchModeFromWorker(filename) {
if (!shouldReportRequiredModules()) {
return;
}
const { isMainThread } = internalBinding('worker');
if (isMainThread) {
return;
}
// Lazy require to avoid circular dependency: worker_threads is loaded after
// the CJS loader is fully set up.
const { parentPort } = require('worker_threads');
if (!parentPort) {
return;
}
parentPort.postMessage({ 'watch:require': [filename] });
}

/**
* Create a new module instance.
* @param {string} id
Expand Down Expand Up @@ -1245,6 +1267,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty
relResolveCacheIdentifier = `${parent.path}\x00${request}`;
const filename = relativeResolveCache[relResolveCacheIdentifier];
reportModuleToWatchMode(filename);
reportModuleToWatchModeFromWorker(filename);
if (filename !== undefined) {
const cachedModule = Module._cache[filename];
if (cachedModule !== undefined) {
Expand Down Expand Up @@ -1335,6 +1358,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty
}

reportModuleToWatchMode(filename);
reportModuleToWatchModeFromWorker(filename);
Module._cache[filename] = module;
module[kIsCachedByESMLoader] = false;
// If there are resolve hooks, carry the context information into the
Expand Down
10 changes: 10 additions & 0 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,16 @@ class ModuleLoader {
const type = requestType === kRequireInImportedCJS ? 'require' : 'import';
process.send({ [`watch:${type}`]: [url] });
}
// Relay Events from worker to main thread
if (process.env.WATCH_REPORT_DEPENDENCIES && !process.send) {
const { isMainThread } = internalBinding('worker');
if (!isMainThread) {
const { parentPort } = require('worker_threads');
if (parentPort) {
parentPort.postMessage({ 'watch:import': [url] });
}
}
}

// TODO(joyeecheung): update the module requests to use importAttributes as property names.
const importAttributes = resolveResult.importAttributes ?? request.attributes;
Expand Down
13 changes: 13 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const {
ArrayIsArray,
ArrayPrototypeForEach,
ArrayPrototypeMap,
ArrayPrototypePush,
Expand Down Expand Up @@ -333,6 +334,18 @@ class Worker extends EventEmitter {
this[kPublicPort].on(event, (message) => this.emit(event, message));
});
setupPortReferencing(this[kPublicPort], this, 'message');

// Relay events from worker thread to watcher
if (process.env.WATCH_REPORT_DEPENDENCIES && process.send) {
this[kPublicPort].on('message', (message) => {
if (ArrayIsArray(message?.['watch:require'])) {
process.send({ 'watch:require': message['watch:require'] });
}
if (ArrayIsArray(message?.['watch:import'])) {
process.send({ 'watch:import': message['watch:import'] });
}
});
}
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
Expand Down
281 changes: 281 additions & 0 deletions test/sequential/test-watch-mode-worker.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
import * as common from '../common/index.mjs';
import tmpdir from '../common/tmpdir.js';
import assert from 'node:assert';
import path from 'node:path';
import { execPath } from 'node:process';
import { describe, it } from 'node:test';
import { spawn } from 'node:child_process';
import { writeFileSync, readFileSync } from 'node:fs';
import { inspect } from 'node:util';
import { pathToFileURL } from 'node:url';
import { createInterface } from 'node:readline';

if (common.isIBMi)
common.skip('IBMi does not support `fs.watch()`');

function restart(file, content = readFileSync(file)) {
writeFileSync(file, content);
const timer = setInterval(() => writeFileSync(file, content), common.platformTimeout(250));
return () => clearInterval(timer);
}

let tmpFiles = 0;
function createTmpFile(content = 'console.log(\'running\');', ext = '.js', basename = tmpdir.path) {
const file = path.join(basename, `${tmpFiles++}${ext}`);
writeFileSync(file, content);
return file;
}

async function runWriteSucceed({
file,
watchedFile,
watchFlag = '--watch',
args = [file],
completed = 'Completed running',
restarts = 2,
options = {},
shouldFail = false,
}) {
args.unshift('--no-warnings');
if (watchFlag !== null) args.unshift(watchFlag);

const child = spawn(execPath, args, { encoding: 'utf8', stdio: 'pipe', ...options });

let completes = 0;
let cancelRestarts = () => {};
let stderr = '';
const stdout = [];

child.stderr.on('data', (data) => {
stderr += data;
});

try {
for await (const data of createInterface({ input: child.stdout })) {
if (!data.startsWith('Waiting for graceful termination') &&
!data.startsWith('Gracefully restarted')) {
stdout.push(data);
}

if (data.startsWith(completed)) {
completes++;

if (completes === restarts) break;

if (completes === 1) {
cancelRestarts = restart(watchedFile);
}
}

if (!shouldFail && data.startsWith('Failed running')) break;
}
} finally {
child.kill();
cancelRestarts();
}

return { stdout, stderr, pid: child.pid };
}

tmpdir.refresh();
const dir = tmpdir.path;

describe('watch mode', { concurrency: !process.env.TEST_PARALLEL, timeout: 60_000 }, () => {
it('should watch changes to worker - cjs', async () => {
const worker = path.join(dir, 'worker.js');

writeFileSync(worker, `
console.log('worker running');
`);

const file = createTmpFile(`
const { Worker } = require('node:worker_threads');
const w = new Worker(${JSON.stringify(worker)});
`, '.js', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: worker,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'worker running',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'worker running',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});

it('should watch changes to worker dependencies - cjs', async () => {
const dep = path.join(dir, 'dep.js');
const worker = path.join(dir, 'worker.js');

writeFileSync(dep, `
module.exports = 'dep v1';
`);

writeFileSync(worker, `
const dep = require('./dep.js');
console.log(dep);
`);

const file = createTmpFile(`
const { Worker } = require('node:worker_threads');
const w = new Worker(${JSON.stringify(worker)});
`, '.js', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: dep,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});

it('should watch changes to nested worker dependencies - cjs', async () => {
const subDep = path.join(dir, 'sub-dep.js');
const dep = path.join(dir, 'dep.js');
const worker = path.join(dir, 'worker.js');

writeFileSync(subDep, `
module.exports = 'sub-dep v1';
`);

writeFileSync(dep, `
const subDep = require('./sub-dep.js');
console.log(subDep);
module.exports = 'dep v1';
`);

writeFileSync(worker, `
const dep = require('./dep.js');
`);

const file = createTmpFile(`
const { Worker } = require('node:worker_threads');
const w = new Worker(${JSON.stringify(worker)});
`, '.js', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: subDep,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'sub-dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'sub-dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});

it('should watch changes to worker - esm', async () => {
const worker = path.join(dir, 'worker.mjs');

writeFileSync(worker, `
console.log('worker running');
`);

const file = createTmpFile(`
import { Worker } from 'node:worker_threads';
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
`, '.mjs', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: worker,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'worker running',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'worker running',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});

it('should watch changes to worker dependencies - esm', async () => {
const dep = path.join(dir, 'dep.mjs');
const worker = path.join(dir, 'worker.mjs');

writeFileSync(dep, `
export default 'dep v1';
`);

writeFileSync(worker, `
import dep from ${JSON.stringify(pathToFileURL(dep))};
console.log(dep);
`);

const file = createTmpFile(`
import { Worker } from 'node:worker_threads';
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
`, '.mjs', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: dep,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});

it('should watch changes to nested worker dependencies - esm', async () => {
const subDep = path.join(dir, 'sub-dep.mjs');
const dep = path.join(dir, 'dep.mjs');
const worker = path.join(dir, 'worker.mjs');

writeFileSync(subDep, `
export default 'sub-dep v1';
`);

writeFileSync(dep, `
import subDep from ${JSON.stringify(pathToFileURL(subDep))};
console.log(subDep);
export default 'dep v1';
`);

writeFileSync(worker, `
import dep from ${JSON.stringify(pathToFileURL(dep))};
`);

const file = createTmpFile(`
import { Worker } from 'node:worker_threads';
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
`, '.mjs', dir);

const { stderr, stdout } = await runWriteSucceed({
file,
watchedFile: subDep,
});

assert.strictEqual(stderr, '');
assert.deepStrictEqual(stdout, [
'sub-dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
`Restarting ${inspect(file)}`,
'sub-dep v1',
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
]);
});
});