diff --git a/.changeset/dry-buttons-help.md b/.changeset/dry-buttons-help.md new file mode 100644 index 00000000..ec7844b9 --- /dev/null +++ b/.changeset/dry-buttons-help.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +ensure userdata is passed through when resampling diff --git a/.changeset/eleven-timers-kick.md b/.changeset/eleven-timers-kick.md new file mode 100644 index 00000000..a84a957f --- /dev/null +++ b/.changeset/eleven-timers-kick.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": minor +--- + +Add typeguards for frame processors in order to avoid dual package hazard diff --git a/packages/livekit-rtc/src/audio_frame.ts b/packages/livekit-rtc/src/audio_frame.ts index d8e7ab32..08c57d1b 100644 --- a/packages/livekit-rtc/src/audio_frame.ts +++ b/packages/livekit-rtc/src/audio_frame.ts @@ -32,9 +32,14 @@ export class AudioFrame { this._userdata = userdata; } - static create(sampleRate: number, channels: number, samplesPerChannel: number): AudioFrame { + static create( + sampleRate: number, + channels: number, + samplesPerChannel: number, + userdata?: Record, + ): AudioFrame { const data = new Int16Array(channels * samplesPerChannel); - return new AudioFrame(data, sampleRate, channels, samplesPerChannel); + return new AudioFrame(data, sampleRate, channels, samplesPerChannel, userdata); } /** @internal */ @@ -103,5 +108,12 @@ export const combineAudioFrames = (buffer: AudioFrame | AudioFrame[]): AudioFram } const data = new Int16Array(buffer.map((x) => [...x.data]).flat()); - return new AudioFrame(data, sampleRate, channels, totalSamplesPerChannel); + + // Merge userdata from all frames + const mergedUserdata: Record = {}; + for (const frame of buffer) { + Object.assign(mergedUserdata, frame.userdata); + } + + return new AudioFrame(data, sampleRate, channels, totalSamplesPerChannel, mergedUserdata); }; diff --git a/packages/livekit-rtc/src/audio_resampler.ts b/packages/livekit-rtc/src/audio_resampler.ts index 82dfe0fc..4468bea5 100644 --- a/packages/livekit-rtc/src/audio_resampler.ts +++ b/packages/livekit-rtc/src/audio_resampler.ts @@ -131,12 +131,14 @@ export class AudioResampler { } const outputData = FfiClient.instance.copyBuffer(res.outputPtr, res.size!); + return [ new AudioFrame( new Int16Array(outputData.buffer), this.#outputRate, this.#channels, Math.trunc(outputData.length / this.#channels / 2), + data.userdata, ), ]; } diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index c136e098..94181cb1 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -5,7 +5,7 @@ import type { UnderlyingSource } from 'node:stream/web'; import { AudioFrame } from './audio_frame.js'; import type { FfiEvent } from './ffi_client.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; -import { FrameProcessor } from './frame_processor.js'; +import { type FrameProcessor, isAudioFrameProcessor } from './frame_processor.js'; import { log } from './log.js'; import type { NewAudioStreamResponse } from './proto/audio_frame_pb.js'; import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js'; @@ -41,7 +41,7 @@ class AudioStreamSource implements UnderlyingSource { if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') { this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000; this.numChannels = sampleRateOrOptions.numChannels ?? 1; - if (sampleRateOrOptions.noiseCancellation instanceof FrameProcessor) { + if (isAudioFrameProcessor(sampleRateOrOptions.noiseCancellation)) { this.frameProcessor = sampleRateOrOptions.noiseCancellation; } else { this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; diff --git a/packages/livekit-rtc/src/frame_processor.ts b/packages/livekit-rtc/src/frame_processor.ts index 70f6be11..d8f4b2f9 100644 --- a/packages/livekit-rtc/src/frame_processor.ts +++ b/packages/livekit-rtc/src/frame_processor.ts @@ -15,7 +15,42 @@ export type FrameProcessorCredentials = { url: string; }; +export const FrameProcessorSymbol = Symbol.for('lk.frame-processor'); + +export function isFrameProcessor( + maybeProcessor: unknown, + type?: Type, +): maybeProcessor is FrameProcessor< + Type extends 'audio' ? AudioFrame : Type extends 'video' ? VideoFrame : AudioFrame | VideoFrame +> { + return ( + maybeProcessor !== null && + typeof maybeProcessor === 'object' && + 'symbol' in maybeProcessor && + maybeProcessor.symbol === FrameProcessorSymbol && + (!type || ('type' in maybeProcessor && maybeProcessor.type === type)) + ); +} + +export function isAudioFrameProcessor( + maybeProcessor: unknown, +): maybeProcessor is FrameProcessor { + return isFrameProcessor(maybeProcessor, 'audio'); +} + +export function isVideoFrameProcessor( + maybeProcessor: unknown, +): maybeProcessor is FrameProcessor { + return isFrameProcessor(maybeProcessor, 'video'); +} + export abstract class FrameProcessor { + readonly symbol = FrameProcessorSymbol; + abstract readonly type: Frame extends VideoFrame + ? 'video' + : Frame extends AudioFrame + ? 'audio' + : never; abstract isEnabled(): boolean; abstract setEnabled(enabled: boolean): void;