Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
- `worker_threads`: Runs workers in [`node:worker_threads`](https://nodejs.org/api/worker_threads.html). For `main thread <-> worker thread` communication you can use [`MessagePort`](https://nodejs.org/api/worker_threads.html#class-messageport) in the `pool.run()` method's [`transferList` option](https://nodejs.org/api/worker_threads.html#portpostmessagevalue-transferlist). See [example](#main-thread---worker-thread-communication).
- `child_process`: Runs workers in [`node:child_process`](https://nodejs.org/api/child_process.html). For `main thread <-> worker process` communication you can use `TinypoolChannel` in the `pool.run()` method's `channel` option. For filtering out the Tinypool's internal messages see `TinypoolWorkerMessage`. See [example](#main-process---worker-process-communication).
- `teardown`: name of the function in file that should be called before worker is terminated. Must be named exported.
- `serialization`: Specify the kind of serialization used for the `child_process` runtime. Possible values are `'json'` and `'advanced'`. See Node.js [Advanced serialization](https://nodejs.org/docs/latest/api/child_process.html#advanced-serialization) for more details.

#### Pool methods

Expand Down
2 changes: 2 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { MessagePort, TransferListItem } from 'node:worker_threads'
import type { SerializationType } from 'node:child_process'

/** Channel for communicating between main thread and workers */
export interface TinypoolChannel {
Expand All @@ -21,6 +22,7 @@ export interface TinypoolWorker {
resourceLimits?: any
workerData: TinypoolData
trackUnmanagedFds?: boolean
serialization?: SerializationType
Comment thread
claneo marked this conversation as resolved.
}): void
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
type MessagePort,
receiveMessageOnPort,
} from 'node:worker_threads'
import type { SerializationType } from 'node:child_process'
import { once, EventEmitterAsyncResource } from 'node:events'
import { AsyncResource } from 'node:async_hooks'
import { fileURLToPath, URL } from 'node:url'
Expand Down Expand Up @@ -153,6 +154,7 @@ interface Options {
trackUnmanagedFds?: boolean
isolateWorkers?: boolean
teardown?: string
serialization?: SerializationType
Comment thread
AriPerkkio marked this conversation as resolved.
}

interface FilledOptions extends Options {
Expand Down Expand Up @@ -735,6 +737,7 @@ class ThreadPool {
this.options.workerData,
] as TinypoolData,
trackUnmanagedFds: this.options.trackUnmanagedFds,
serialization: this.options.serialization,
})

const onMessage = (message: ResponseMessage) => {
Expand Down
6 changes: 3 additions & 3 deletions test/fixtures/child_process-communication.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
export default async function run() {
export default async function run(task) {
let resolve = () => {}
const promise = new Promise((r) => (resolve = r))

process.send('Child process started')

process.on('message', (message) => {
process.send({ received: message, response: 'Hello from worker' })
resolve()
resolve({ received: task, response: 'Hello from worker' })
})

await promise
return promise
}
63 changes: 62 additions & 1 deletion test/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import EventEmitter from 'node:events'
import * as path from 'node:path'
import { fileURLToPath } from 'node:url'
import { Tinypool } from 'tinypool'
import EventEmitter from 'node:events'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

Expand Down Expand Up @@ -199,6 +199,67 @@ describe('child_process', () => {
})
})

test('can send complex messages to port', async () => {
const pool = createPool({
runtime: 'child_process',
filename: path.resolve(
__dirname,
'fixtures/child_process-communication.mjs'
),
serialization: 'advanced',
})

const complexData = {
bigint: 123456789123456789n,
map: new Map([['hello', 'world']]),
set: new Set(['hello', 'world']),
error: new Error('message'),
regexp: /regexp/,
}

const emitter = new EventEmitter()

const startup = new Promise<void>((resolve) =>
emitter.on(
'response',
(message) => message === 'Child process started' && resolve()
)
)

const runPromise = pool.run(complexData, {
channel: {
onMessage: (callback) => emitter.on('message', callback),
postMessage: (message) => emitter.emit('response', message),
},
})

// Wait for the child process to start
await startup

const response = new Promise<any>((resolve) =>
emitter.on('response', (message) => resolve(message))
)

// Send message to child process
emitter.emit('message', complexData)

// Wait for task to finish
const runResult = await runPromise

expect(runResult).toMatchObject({
received: complexData,
response: 'Hello from worker',
})

// Wait for response from child
const channelResult = await response

expect(channelResult).toMatchObject({
received: complexData,
response: 'Hello from worker',
})
})

test('channel is closed when isolated', async () => {
const pool = createPool({
runtime: 'child_process',
Expand Down