From 01b01bea28b3806bbdded687579ea524a9c93119 Mon Sep 17 00:00:00 2001 From: Mark Gulbrandsen Date: Mon, 9 Mar 2026 20:37:37 -0700 Subject: [PATCH 1/4] feat(duplex): add synchronized duplex stream support for CoreAudio --- CHANGELOG.md | 5 + Cargo.toml | 3 + README.md | 3 +- examples/duplex_feedback.rs | 104 ++++++ src/duplex.rs | 79 ++++ src/host/coreaudio/macos/device.rs | 561 ++++++++++++++++++++++++++++- src/host/coreaudio/macos/mod.rs | 52 ++- src/lib.rs | 1 + src/platform/mod.rs | 23 ++ src/traits.rs | 79 +++- 10 files changed, 903 insertions(+), 7 deletions(-) create mode 100644 examples/duplex_feedback.rs create mode 100644 src/duplex.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index c78c8a3a1..277139ba7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`. - **PipeWire**: New host for Linux and some BSDs using the PipeWire API. - **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API. +- `DeviceTrait::build_duplex_stream` and `build_duplex_stream_raw` for synchronized input/output. +- `duplex` module with `DuplexStreamConfig` and `DuplexCallbackInfo` types. +- **CoreAudio**: Duplex stream support with hardware-synchronized input/output. +- Example `duplex_feedback` demonstrating duplex stream usage. ### Changed @@ -91,6 +95,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **POTENTIALLY BREAKING**: `DeviceTrait` now includes `build_duplex_stream()` and `build_duplex_stream_raw()` methods. The default implementation returns `StreamConfigNotSupported`, so external implementations are compatible without changes. - Bump overall MSRV to 1.78. - **ALSA**: Update `alsa` dependency to 0.11. - **ALSA**: Bump MSRV to 1.82. diff --git a/Cargo.toml b/Cargo.toml index 8e28c9ca5..41ac7d253 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,6 +201,9 @@ name = "record_wav" [[example]] name = "synth_tones" +[[example]] +name = "duplex_feedback" + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 2badf60d3..6523935a7 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ This library currently supports the following: - Enumerate known supported input and output stream formats for a device. - Get the current default input and output stream formats for a device. - Build and run input and output PCM streams on a chosen device with a given stream format. +- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization (macOS only, more platforms coming soon). Currently, supported platforms include: @@ -174,7 +175,7 @@ If you are unable to build the library: ## Examples -CPAL comes with several examples in `examples/`. +CPAL comes with several examples in `examples/`, including `duplex_feedback` for hardware-synchronized duplex streams. Run an example with: ```bash diff --git a/examples/duplex_feedback.rs b/examples/duplex_feedback.rs new file mode 100644 index 000000000..c3533a318 --- /dev/null +++ b/examples/duplex_feedback.rs @@ -0,0 +1,104 @@ +//! Feeds back the input stream directly into the output stream using a duplex stream. +//! +//! Unlike the `feedback.rs` example which uses separate input/output streams with a ring buffer, +//! duplex streams provide hardware-synchronized input/output without additional buffering. +//! +//! Note: Currently only supported on macOS (CoreAudio). Windows (WASAPI) and Linux (ALSA) +//! implementations are planned. + +#[cfg(target_os = "macos")] +mod imp { + use clap::Parser; + use cpal::duplex::DuplexStreamConfig; + use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + use cpal::BufferSize; + + #[derive(Parser, Debug)] + #[command(version, about = "CPAL duplex feedback example", long_about = None)] + struct Opt { + /// The audio device to use (must support duplex operation) + #[arg(short, long, value_name = "DEVICE")] + device: Option, + + /// Number of input channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + input_channels: u16, + + /// Number of output channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + output_channels: u16, + + /// Sample rate in Hz + #[arg(short, long, value_name = "RATE", default_value_t = 48000)] + sample_rate: u32, + + /// Buffer size in frames + #[arg(short, long, value_name = "FRAMES", default_value_t = 512)] + buffer_size: u32, + } + + pub fn run() -> anyhow::Result<()> { + let opt = Opt::parse(); + let host = cpal::default_host(); + + // Find the device by device ID or use default + let device = if let Some(device_id_str) = opt.device { + let device_id = device_id_str.parse().expect("failed to parse device id"); + host.device_by_id(&device_id) + .unwrap_or_else(|| panic!("failed to find device with id: {}", device_id_str)) + } else { + host.default_output_device() + .expect("no default output device") + }; + + println!("Using device: \"{}\"", device.description()?.name()); + + // Create duplex stream configuration. + let config = DuplexStreamConfig { + input_channels: opt.input_channels, + output_channels: opt.output_channels, + sample_rate: opt.sample_rate, + buffer_size: BufferSize::Fixed(opt.buffer_size), + }; + + println!("Building duplex stream with config: {config:?}"); + + let stream = device.build_duplex_stream::( + &config, + move |input, output, _info| { + output.fill(0.0); + let copy_len = input.len().min(output.len()); + output[..copy_len].copy_from_slice(&input[..copy_len]); + }, + |err| eprintln!("Stream error: {err}"), + None, + )?; + + println!("Successfully built duplex stream."); + println!( + "Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {} frames", + opt.input_channels, opt.output_channels, opt.sample_rate, opt.buffer_size + ); + + println!("Starting duplex stream..."); + stream.play()?; + + println!("Playing for 10 seconds... (speak into your microphone)"); + std::thread::sleep(std::time::Duration::from_secs(10)); + + drop(stream); + println!("Done!"); + Ok(()) + } +} + +fn main() { + #[cfg(target_os = "macos")] + imp::run().unwrap(); + + #[cfg(not(target_os = "macos"))] + { + eprintln!("Duplex streams are currently only supported on macOS."); + eprintln!("Windows (WASAPI) and Linux (ALSA) support is planned."); + } +} diff --git a/src/duplex.rs b/src/duplex.rs new file mode 100644 index 000000000..f05b9e371 --- /dev/null +++ b/src/duplex.rs @@ -0,0 +1,79 @@ +//! Duplex audio stream support with synchronized input/output. +//! +//! This module provides types for building duplex (simultaneous input/output) audio streams +//! with hardware clock synchronization. +//! +//! # Overview +//! +//! Unlike separate input and output streams which may have independent clocks, a duplex stream +//! uses a single device context for both input and output, ensuring they share the same +//! hardware clock. This is essential for applications like: +//! +//! - DAWs (Digital Audio Workstations) +//! - Real-time audio effects processing +//! - Audio measurement and analysis +//! - Any application requiring sample-accurate I/O synchronization +//! +//! See `examples/duplex_feedback.rs` for a working example. + +use crate::{InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; + +/// Information passed to duplex callbacks. +/// +/// This contains timing information for the current audio buffer, combining +/// both input and output timing. A duplex stream has a single callback invocation +/// that provides synchronized input and output data. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct DuplexCallbackInfo { + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, +} + +impl DuplexCallbackInfo { + /// Create a new DuplexCallbackInfo. + /// + /// Note: Both timestamps will share the same `callback` instant since there is + /// only one callback invocation for a duplex stream. + pub fn new( + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, + ) -> Self { + Self { + input_timestamp, + output_timestamp, + } + } + + /// The timestamp for the input portion of the duplex stream. + /// + /// Contains the callback instant and when the input data was captured. + pub fn input_timestamp(&self) -> InputStreamTimestamp { + self.input_timestamp + } + + /// The timestamp for the output portion of the duplex stream. + /// + /// Contains the callback instant and when the output data will be played. + pub fn output_timestamp(&self) -> OutputStreamTimestamp { + self.output_timestamp + } +} + +/// Configuration for a duplex audio stream. +/// +/// Unlike separate input/output streams, duplex streams require matching +/// configuration for both directions since they share a single device context. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DuplexStreamConfig { + /// Number of input channels. + pub input_channels: u16, + + /// Number of output channels. + pub output_channels: u16, + + /// Sample rate in Hz. + pub sample_rate: SampleRate, + + /// Requested buffer size in frames. + pub buffer_size: crate::BufferSize, +} diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 7267d9d50..0ee422445 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -1,6 +1,10 @@ use super::OSStatus; use super::Stream; -use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant}; +use super::{ + asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant, + DuplexCallbackPtr, +}; +use crate::duplex::DuplexCallbackInfo; use crate::host::coreaudio::macos::loopback::LoopbackDevice; use crate::host::coreaudio::macos::StreamInner; use crate::traits::DeviceTrait; @@ -14,7 +18,9 @@ use coreaudio::audio_unit::render_callback::{self, data}; use coreaudio::audio_unit::{AudioUnit, Element, Scope}; use objc2_audio_toolbox::{ kAudioOutputUnitProperty_CurrentDevice, kAudioOutputUnitProperty_EnableIO, - kAudioUnitProperty_StreamFormat, + kAudioUnitErr_TooManyFramesToProcess, kAudioUnitProperty_SetRenderCallback, + kAudioUnitProperty_StreamFormat, AURenderCallbackStruct, AudioUnitRender, + AudioUnitRenderActionFlags, }; use objc2_core_audio::kAudioDevicePropertyDeviceUID; use objc2_core_audio::kAudioObjectPropertyElementMain; @@ -30,7 +36,8 @@ use objc2_core_audio::{ AudioObjectPropertyScope, AudioObjectSetPropertyData, }; use objc2_core_audio_types::{ - AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange, + kAudio_ParamError, AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioTimeStamp, + AudioValueRange, }; use objc2_core_foundation::CFString; use objc2_core_foundation::Type; @@ -336,6 +343,21 @@ impl DeviceTrait for Device { timeout, ) } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Device::build_duplex_stream_raw(self, config, sample_format, data_callback, error_callback) + } } #[derive(Clone, Eq, Hash, PartialEq)] @@ -822,6 +844,7 @@ impl Device { audio_unit, device_id: self.audio_device_id, _loopback_device: loopback_aggregate, + duplex_callback_ptr: None, }, error_callback_for_stream, )?; @@ -922,6 +945,7 @@ impl Device { audio_unit, device_id: self.audio_device_id, _loopback_device: None, + duplex_callback_ptr: None, }, error_callback_for_stream, )?; @@ -939,6 +963,464 @@ impl Device { Ok(stream) } + + /// Calculate latency-adjusted capture and playback timestamps with graceful error handling. + /// + /// Note: input/output streams use .expect() here and will panic - while it is + /// inconsistent, I believe it is safer for duplex to handle gracefully on the real-time + /// audio thread. Errors here should be extremely rare (timestamp arithmetic + /// overflow/underflow). + /// + /// While theoretically and probably practically impossible, this is a critical + /// user-experience issue -- the app shouldn't crash and potentially lose end-user data. + /// The only reason `sub` would overflow is if delay is massively too large, which would + /// indicate some kind of major bug or failure in the OS since callback_instant is derived + /// from host time. Still, I think the best practice is NOT to panic but tell the app + /// and fallback to a degraded latency estimate of no latency. + fn calculate_duplex_timestamps( + callback_instant: crate::StreamInstant, + delay: std::time::Duration, + error_callback: &Arc>, + ) -> (crate::StreamInstant, crate::StreamInstant) + where + E: FnMut(StreamError) + Send + 'static, + { + let capture = match callback_instant.sub(delay) { + Some(c) => c, + None => { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp underflow calculating capture time".to_string(), + }, + }, + ); + callback_instant + } + }; + + let playback = match callback_instant.add(delay) { + Some(p) => p, + None => { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp overflow calculating playback time".to_string(), + }, + }, + ); + callback_instant + } + }; + + (capture, playback) + } + + /// Build a duplex stream with synchronized input and output. + /// + /// This creates a single HAL AudioUnit with both input and output enabled, + /// ensuring they share the same hardware clock. + /// For details, see: https://developer.apple.com/library/archive/technotes/tn2091/_index.html + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // Validate that device supports duplex + if !self.supports_input() || !self.supports_output() { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + // Potentially change the device sample rate to match the config. + set_sample_rate(self.audio_device_id, config.sample_rate)?; + + // Create HAL AudioUnit - always use HalOutput for duplex + let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; + + // Enable BOTH input and output on the AudioUnit + let enable: u32 = 1; + + // Enable input on Element 1 + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Input, + Element::Input, + Some(&enable), + )?; + + // Enable output on Element 0 (usually enabled by default, but be explicit) + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Output, + Element::Output, + Some(&enable), + )?; + + // Set device for the unit (applies to both input and output) + audio_unit.set_property( + kAudioOutputUnitProperty_CurrentDevice, + Scope::Global, + Element::Output, + Some(&self.audio_device_id), + )?; + + // Create StreamConfig for input side + let input_stream_config = StreamConfig { + channels: config.input_channels as ChannelCount, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Create StreamConfig for output side + let output_stream_config = StreamConfig { + channels: config.output_channels as ChannelCount, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Core Audio's HAL AU has two buses, each with a hardware side and a + // client (app) side. We set the stream format on the client-facing side; + // the AU's built-in converter handles translation to/from the hardware format. + // + // Mic ─[Scope::Input]──▶ Input Bus ──[Scope::Output]─▶ App + // (hardware side) (client side) + // + // App ─[Scope::Input]──▶ Output Bus ─[Scope::Output]─▶ Speaker + // (client side) (hardware side) + // + // So the client side is Scope::Output for the input bus (where we read + // captured samples) and Scope::Input for the output bus (where we write + // playback samples). See Apple TN2091 for details. + let input_asbd = asbd_from_config(&input_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Output, + Element::Input, + Some(&input_asbd), + )?; + + let output_asbd = asbd_from_config(&output_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Input, + Element::Output, + Some(&output_asbd), + )?; + + // Buffer frame size is a device-level property. Element::Output (0) is + // the standard convention for Scope::Global properties. + if let BufferSize::Fixed(buffer_size) = &config.buffer_size { + audio_unit.set_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + Some(buffer_size), + )?; + } + + // Query the buffer size range to determine the maximum possible buffer size. + // + // BUFFER ALLOCATION STRATEGY: + // Duplex streams must pre-allocate an input buffer because AudioUnitRender() requires + // us to provide the buffer. Unlike input/output streams where CoreAudio provides the + // buffer, we cannot dynamically reallocate in the callback without: + // 1. Violating real-time constraints (allocation is not RT-safe) + // 2. Implementing complex double/triple buffering (not worth it for rare edge case) + // + // Therefore, we allocate for the MAXIMUM buffer size reported by the device. This + // handles dynamic buffer size changes (e.g., due to sample rate changes or system + // audio settings) transparently and efficiently. + // + // LIMITATION: + // If CoreAudio requests more frames than the device's reported maximum buffer size, + // the stream will fail with kAudioUnitErr_TooManyFramesToProcess. This can happen if: + // - The device's buffer size is changed after stream creation (via System Settings + // or another audio application) + // - The device reconfigures itself (e.g., joins/leaves an aggregate device) + // - The device's supported buffer size range changes due to sample rate or format + // changes + // Recovery requires the user to destroy and recreate the stream, which will query + // the new buffer size range and allocate appropriately. + let buffer_size_range = get_io_buffer_frame_size_range(&audio_unit)?; + let max_buffer_size = match buffer_size_range { + crate::SupportedBufferSize::Range { max, .. } => max, + crate::SupportedBufferSize::Unknown => { + // Fallback: query current size and add headroom for safety + let current: u32 = audio_unit + .get_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + ) + .unwrap_or(512); + // Add 2x headroom when range is unknown to handle potential size increases + current * 2 + } + }; + + // Get callback vars for latency calculation (matching input/output pattern) + let sample_rate = config.sample_rate; + let device_buffer_frames = get_device_buffer_frame_size(&audio_unit).ok(); + + // Get the raw AudioUnit pointer for use in the callback + let raw_audio_unit = *audio_unit.as_ref(); + + // Configuration for callback + let input_channels = config.input_channels as usize; + let sample_bytes = sample_format.sample_size(); + + // Pre-allocate input buffer for the maximum buffer size (in bytes). + // This ensures we can handle dynamic buffer size changes without reallocation. + // See the BUFFER ALLOCATION STRATEGY comment above for detailed rationale. + let input_buffer_samples = max_buffer_size as usize * input_channels; + let input_buffer_bytes = input_buffer_samples * sample_bytes; + let mut input_buffer: Vec = vec![0u8; input_buffer_bytes]; + + // Wrap error callback in Arc for sharing between callback and disconnect handler + let error_callback = Arc::new(Mutex::new(error_callback)); + let error_callback_for_callback = error_callback.clone(); + + // Move data callback into closure + let mut data_callback = data_callback; + + let duplex_proc: Box = Box::new( + move |io_action_flags: NonNull, + in_time_stamp: NonNull, + _in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList| + -> i32 { + // Validate output buffer structure early to avoid wasted work + if io_data.is_null() { + return kAudio_ParamError; + } + // SAFETY: io_data validated as non-null above + let buffer_list = unsafe { &mut *io_data }; + if buffer_list.mNumberBuffers == 0 { + return kAudio_ParamError; + } + + let num_frames = in_number_frames as usize; + let input_samples = num_frames * input_channels; + let input_bytes = input_samples * sample_bytes; + + // Bounds check: ensure the requested frames don't exceed our pre-allocated buffer. + // This buffer is allocated for the maximum buffer size reported by the device, + // so this check should only fail if CoreAudio requests more frames than the + // device's reported maximum (which would indicate a serious driver/OS issue). + if input_bytes > input_buffer.len() { + invoke_error_callback( + &error_callback_for_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!( + "CoreAudio requested {} frames ({} bytes) but the input buffer is \ + only {} bytes (allocated for the device's maximum buffer size at \ + stream creation). This likely means the device's buffer size or \ + configuration changed after the stream was created. To recover, \ + destroy and recreate the stream.", + num_frames, input_bytes, input_buffer.len() + ), + }, + }, + ); + // Return error - this is a persistent failure that will happen every callback + // until the user destroys and recreates the stream + return kAudioUnitErr_TooManyFramesToProcess; + } + + // SAFETY: in_time_stamp is valid per CoreAudio contract + let timestamp: &AudioTimeStamp = unsafe { in_time_stamp.as_ref() }; + + // Create StreamInstant for callback_instant + let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { + Err(err) => { + invoke_error_callback(&error_callback_for_callback, err.into()); + // Return 0 (noErr) to keep the stream alive while notifying the error + // callback. This matches input/output stream behavior and allows graceful + // degradation rather than stopping the stream on transient errors. + return 0; + } + Ok(cb) => cb, + }; + + let buffer = &mut buffer_list.mBuffers[0]; + if buffer.mData.is_null() { + return kAudio_ParamError; + } + let output_samples = buffer.mDataByteSize as usize / sample_bytes; + + // SAFETY: buffer.mData validated as non-null above + let mut output_data = unsafe { + Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) + }; + + // Calculate latency-adjusted timestamps (matching input/output pattern) + let buffer_frames = num_frames; + // Use device buffer size for latency calculation if available + let latency_frames = device_buffer_frames.unwrap_or( + // Fallback to callback buffer size if device buffer size is unknown + buffer_frames, + ); + let delay = frames_to_duration(latency_frames, sample_rate); + + let (capture, playback) = Self::calculate_duplex_timestamps( + callback_instant, + delay, + &error_callback_for_callback, + ); + + // Create callback info with latency-adjusted times + let input_timestamp = crate::InputStreamTimestamp { + callback: callback_instant, + capture, + }; + let output_timestamp = crate::OutputStreamTimestamp { + callback: callback_instant, + playback, + }; + + // Pull input from Element 1 using AudioUnitRender + // use the pre-allocated input_buffer + // Set up AudioBufferList pointing to our input buffer + let mut input_buffer_list = AudioBufferList { + mNumberBuffers: 1, + mBuffers: [AudioBuffer { + mNumberChannels: input_channels as u32, + mDataByteSize: input_bytes as u32, + mData: input_buffer.as_mut_ptr() as *mut std::ffi::c_void, + }], + }; + + // SAFETY: AudioUnitRender is called with valid parameters: + // - raw_audio_unit is valid for the callback duration + // - input_buffer_list is created just above on the stack with a pointer to + // input_buffer, which is properly aligned (Rust's standard allocators return + // pointers aligned to at least 8 bytes, exceeding f32/i16 requirements) + // - input_buffer has been bounds-checked above to ensure sufficient capacity + // - All other parameters (timestamps, flags, etc.) come from CoreAudio itself + let status = unsafe { + AudioUnitRender( + raw_audio_unit, + io_action_flags.as_ptr(), + in_time_stamp, + 1, // Element 1 = input + in_number_frames, + NonNull::new_unchecked(&mut input_buffer_list), + ) + }; + + if status != 0 { + // Report error but continue with silence for graceful degradation + // The application should decide what to do. + invoke_error_callback( + &error_callback_for_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!( + "AudioUnitRender failed for input: OSStatus {}", + status + ), + }, + }, + ); + input_buffer[..input_bytes].fill(0); + } + + // SAFETY: Creating Data from input_buffer is safe because: + // - input_buffer is a valid Vec owned by this closure + // - input_samples (num_frames * input_channels) was bounds-checked above to ensure + // input_samples * sample_bytes <= input_buffer.len() + // - AudioUnitRender just filled the buffer with valid audio data (or we filled + // it with silence on error) + // - The Data lifetime is scoped to this callback and doesn't outlive input_buffer + // - The pointer is suitably aligned: We successfully passed this buffer to + // AudioUnitRender (line 1314), which requires properly aligned buffers and would + // have failed if alignment were incorrect. Additionally, in practice, Rust's + // standard allocators (System, jemalloc, etc.) return pointers aligned to at + // least 8 bytes, which exceeds the requirements for f32 (4 bytes) and i16 (2 bytes). + let input_data = unsafe { + Data::from_parts( + input_buffer.as_mut_ptr() as *mut (), + input_samples, + sample_format, + ) + }; + + let callback_info = DuplexCallbackInfo::new(input_timestamp, output_timestamp); + data_callback(&input_data, &mut output_data, &callback_info); + + // Return 0 (noErr) to indicate successful render + 0 + }, + ); + + // Box the wrapper and get raw pointer for CoreAudio + let wrapper = Box::new(DuplexProcWrapper { + callback: duplex_proc, + }); + let wrapper_ptr = Box::into_raw(wrapper); + + // Set up the render callback + let render_callback = AURenderCallbackStruct { + inputProc: Some(duplex_input_proc), + inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, + }; + + audio_unit.set_property( + kAudioUnitProperty_SetRenderCallback, + Scope::Global, + Element::Output, + Some(&render_callback), + )?; + + // Create the stream inner, storing the callback pointer for cleanup + let inner = StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: None, + duplex_callback_ptr: Some(DuplexCallbackPtr(wrapper_ptr)), + }; + + // Create error callback for stream - either dummy or real based on device type. + // For duplex, only swallow disconnect if the device is the default for both + // roles — otherwise Core Audio won't re-route both directions. + let error_callback_for_stream: super::ErrorCallback = + if is_default_input_device(self) && is_default_output_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback.clone(); + Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }) + }; + + // Create the duplex stream + let stream = Stream::new(inner, error_callback_for_stream)?; + + // Start the audio unit + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })? + .audio_unit + .start()?; + + Ok(stream) + } } /// Configure stream format and buffer size for CoreAudio stream. @@ -1030,3 +1512,76 @@ pub(crate) fn get_device_buffer_frame_size( )?; Ok(frames as usize) } + +// ============================================================================ +// Duplex callback infrastructure +// ============================================================================ + +use std::ffi::c_void; + +/// Type alias for the duplex callback closure. +/// +/// This is the raw callback signature that CoreAudio expects. The closure +/// receives the same parameters as a C callback and returns an OSStatus. +type DuplexProcFn = dyn FnMut( + NonNull, + NonNull, + u32, // bus_number + u32, // num_frames + *mut AudioBufferList, +) -> i32; + +/// Wrapper for the boxed duplex callback closure. +/// +/// This struct is allocated on the heap and its pointer is passed to CoreAudio +/// as the refcon. The extern "C" callback function casts the refcon back to +/// this type and calls the closure. +pub(crate) struct DuplexProcWrapper { + callback: Box, +} + +// SAFETY: DuplexProcWrapper is Send because: +// 1. The boxed closure captures only Send types (the DuplexCallback trait requires Send) +// 2. The raw pointer stored in StreamInner is accessed: +// - By CoreAudio's audio thread via `duplex_input_proc` (as the refcon) +// - During Drop, after stopping the audio unit (callback no longer running) +// These never overlap: Drop stops the audio unit before reclaiming the pointer. +// 3. CoreAudio guarantees single-threaded callback invocation +unsafe impl Send for DuplexProcWrapper {} + +/// CoreAudio render callback for duplex audio. +/// +/// This is a thin wrapper that casts the refcon back to our DuplexProcWrapper +/// and calls the inner closure. The closure owns all the callback state via +/// move semantics, so no Mutex is needed. +/// +/// Note: `extern "C-unwind"` is required here because `AURenderCallbackStruct` +/// from coreaudio-sys types the `inputProc` field as `extern "C-unwind"`. +/// Ideally this would be `extern "C"` so that a panic aborts rather than +/// unwinding through CoreAudio's C frames (this callback runs on CoreAudio's +/// audio thread with no Rust frames above to catch an unwind). Per RFC 2945 +/// (https://github.com/rust-lang/rust/issues/115285), `extern "C"` aborts on +/// panic, which would be the correct behavior here. +extern "C-unwind" fn duplex_input_proc( + in_ref_con: NonNull, + io_action_flags: NonNull, + in_time_stamp: NonNull, + in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList, +) -> i32 { + // SAFETY: `in_ref_con` points to a heap-allocated `DuplexProcWrapper` created + // via `Box::into_raw` in `build_duplex_stream`, and remains valid for the + // lifetime of the audio unit (reclaimed in `StreamInner::drop`). The `as_mut()` call + // produces an exclusive `&mut` reference, which is sound because CoreAudio + // guarantees single-threaded callback invocation — this function is never + // called concurrently, so only one `&mut` to the wrapper exists at a time. + let wrapper = unsafe { in_ref_con.cast::().as_mut() }; + (wrapper.callback)( + io_action_flags, + in_time_stamp, + in_bus_number, + in_number_frames, + io_data, + ) +} diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index d94525037..6f617ee8b 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -170,6 +170,20 @@ impl DisconnectManager { } } +/// Owned pointer to the duplex callback wrapper that is safe to send across threads. +/// +/// SAFETY: The pointer is created via `Box::into_raw` on the build thread and shared with +/// CoreAudio via `inputProcRefCon`. CoreAudio dereferences it on every render callback on +/// its single-threaded audio thread for the lifetime of the stream. On drop, the audio unit +/// is stopped before reclaiming the `Box`, preventing use-after-free. `Send` is sound because +/// there is no concurrent mutable access—the build/drop thread never accesses the pointer +/// while the audio unit is running, and only reclaims it after stopping the audio unit. +struct DuplexCallbackPtr(*mut device::DuplexProcWrapper); + +// SAFETY: See above — the pointer is shared with CoreAudio's audio thread but never +// accessed concurrently. The audio unit is stopped before reclaiming in drop. +unsafe impl Send for DuplexCallbackPtr {} + struct StreamInner { playing: bool, audio_unit: AudioUnit, @@ -182,13 +196,22 @@ struct StreamInner { /// Manage the lifetime of the aggregate device used /// for loopback recording _loopback_device: Option, + /// Pointer to the duplex callback wrapper, manually managed for duplex streams. + /// + /// coreaudio-rs doesn't support duplex streams (enabling both input and output + /// simultaneously), so we cannot use its `set_render_callback` API which would + /// manage the callback lifetime automatically. Instead, we manually manage this + /// callback pointer (created via `Box::into_raw`) and clean it up in Drop. + /// + /// This is None for regular input/output streams. + duplex_callback_ptr: Option, } impl StreamInner { fn play(&mut self) -> Result<(), PlayStreamError> { if !self.playing { if let Err(e) = self.audio_unit.start() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -200,7 +223,7 @@ impl StreamInner { fn pause(&mut self) -> Result<(), PauseStreamError> { if self.playing { if let Err(e) = self.audio_unit.stop() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -210,6 +233,31 @@ impl StreamInner { } } +impl Drop for StreamInner { + fn drop(&mut self) { + // Clean up duplex callback if present. + if let Some(DuplexCallbackPtr(ptr)) = self.duplex_callback_ptr { + if !ptr.is_null() { + // Stop the audio unit to ensure the callback is no longer being called + // before reclaiming duplex_callback_ptr below. We must stop here regardless + // of AudioUnit::drop's behavior. + // Note: AudioUnit::drop will also call stop() — likely safe, but we stop here anyway. + let _ = self.audio_unit.stop(); + // SAFETY: `ptr` was created via `Box::into_raw` in + // `build_duplex_stream` and has not been reclaimed elsewhere. + // The audio unit was stopped above, so the callback no longer + // holds a reference to this pointer. + unsafe { + let _ = Box::from_raw(ptr); + } + } + } + + // AudioUnit's own Drop will handle uninitialize and dispose + // _loopback_device's Drop will handle aggregate device cleanup + } +} + pub struct Stream { inner: Arc>, // Manages the device disconnection listener separately to allow Stream to be Send. diff --git a/src/lib.rs b/src/lib.rs index 71787ccfc..49ea11d66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,6 +192,7 @@ use std::convert::TryInto; use std::time::Duration; pub mod device_description; +pub mod duplex; mod error; mod host; pub mod platform; diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 116d1b1d0..8493b7a5b 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -522,6 +522,29 @@ macro_rules! impl_platform_host { )* } } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &mut crate::Data, &crate::duplex::DuplexCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + match self.0 { + $( + $(#[cfg($feat)])? + DeviceInner::$HostVariant(ref d) => d + .build_duplex_stream_raw(config, sample_format, data_callback, error_callback, timeout) + .map(StreamInner::$HostVariant) + .map(Stream::from), + )* + } + } } impl crate::traits::HostTrait for Host { diff --git a/src/traits.rs b/src/traits.rs index 0b0ee0e8c..703138a0a 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -8,6 +8,7 @@ use std::time::Duration; use crate::{ + duplex::{DuplexCallbackInfo, DuplexStreamConfig}, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, PauseStreamError, PlayStreamError, SampleFormat, SizedSample, StreamConfig, @@ -94,10 +95,12 @@ pub trait DeviceTrait { type SupportedInputConfigs: Iterator; /// The iterator type yielding supported output stream formats. type SupportedOutputConfigs: Iterator; - /// The stream type created by [`build_input_stream_raw`] and [`build_output_stream_raw`]. + /// The stream type created by [`build_input_stream_raw`], [`build_output_stream_raw`], + /// and [`build_duplex_stream_raw`]. /// /// [`build_input_stream_raw`]: Self::build_input_stream_raw /// [`build_output_stream_raw`]: Self::build_output_stream_raw + /// [`build_duplex_stream_raw`]: Self::build_duplex_stream_raw type Stream: StreamTrait; /// The human-readable name of the device. @@ -139,6 +142,15 @@ pub trait DeviceTrait { .is_ok_and(|mut iter| iter.next().is_some()) } + /// True if the device supports duplex (simultaneous input and output), otherwise false. + /// + /// Duplex operation requires the device to support both input and output with a shared + /// hardware clock. This is typically true for audio interfaces but may not be available + /// on all devices (e.g., output-only speakers or input-only microphones). + fn supports_duplex(&self) -> bool { + self.supports_input() && self.supports_output() + } + /// An iterator yielding formats that are supported by the backend. /// /// Can return an error if the device is no longer valid (e.g. it has been disconnected). @@ -286,6 +298,71 @@ pub trait DeviceTrait { where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static; + + /// Create a duplex stream with synchronized input and output sharing the same hardware clock. + /// + /// Essential for DAWs, real-time effects, and any application requiring sample-accurate I/O sync. + /// + /// Check [`supports_duplex`](Self::supports_duplex) before calling. See the example below for usage. + fn build_duplex_stream( + &self, + config: &DuplexStreamConfig, + mut data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + T: SizedSample, + D: FnMut(&[T], &mut [T], &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + self.build_duplex_stream_raw( + config, + T::FORMAT, + move |input, output, info| { + data_callback( + input + .as_slice() + .expect("host supplied incorrect sample type"), + output + .as_slice_mut() + .expect("host supplied incorrect sample type"), + info, + ) + }, + error_callback, + timeout, + ) + } + + /// Create a dynamically typed duplex stream. + /// + /// This method allows working with sample data as raw bytes, useful when the sample + /// format is determined at runtime. For compile-time known formats, prefer + /// [`build_duplex_stream`](Self::build_duplex_stream). + /// + /// # Parameters + /// + /// * `config` - The duplex stream configuration specifying channels, sample rate, and buffer size. + /// * `sample_format` - The sample format of the audio data. + /// * `data_callback` - Called periodically with synchronized input and output buffers as [`Data`]. + /// * `error_callback` - Called when a stream error occurs (e.g., device disconnected). + /// * `timeout` - Optional timeout for backend operations. `None` indicates blocking behavior, + /// `Some(duration)` sets a maximum wait time. Not all backends support timeouts. + fn build_duplex_stream_raw( + &self, + _config: &DuplexStreamConfig, + _sample_format: SampleFormat, + _data_callback: D, + _error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Err(BuildStreamError::StreamConfigNotSupported) + } } /// A stream created from [`Device`](DeviceTrait), with methods to control playback. From 831a1751646f915e2050da9723c8f94ec57064de Mon Sep 17 00:00:00 2001 From: Mark Gulbrandsen Date: Wed, 11 Mar 2026 17:48:32 -0700 Subject: [PATCH 2/4] feat: duplex support PR feedback updates. --- README.md | 2 +- examples/duplex_feedback.rs | 43 ++++---- src/duplex.rs | 17 +-- src/host/coreaudio/macos/device.rs | 172 +++++++++++++---------------- src/host/coreaudio/macos/mod.rs | 2 +- src/traits.rs | 2 - 6 files changed, 104 insertions(+), 134 deletions(-) diff --git a/README.md b/README.md index 6523935a7..5a3903254 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ This library currently supports the following: - Enumerate known supported input and output stream formats for a device. - Get the current default input and output stream formats for a device. - Build and run input and output PCM streams on a chosen device with a given stream format. -- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization (macOS only, more platforms coming soon). +- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization. Currently, supported platforms include: diff --git a/examples/duplex_feedback.rs b/examples/duplex_feedback.rs index c3533a318..49bf4c19d 100644 --- a/examples/duplex_feedback.rs +++ b/examples/duplex_feedback.rs @@ -11,7 +11,7 @@ mod imp { use clap::Parser; use cpal::duplex::DuplexStreamConfig; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; - use cpal::BufferSize; + use cpal::{BufferSize, ChannelCount, FrameCount, Sample, SampleRate}; #[derive(Parser, Debug)] #[command(version, about = "CPAL duplex feedback example", long_about = None)] @@ -22,19 +22,19 @@ mod imp { /// Number of input channels #[arg(long, value_name = "CHANNELS", default_value_t = 2)] - input_channels: u16, + input_channels: ChannelCount, /// Number of output channels #[arg(long, value_name = "CHANNELS", default_value_t = 2)] - output_channels: u16, + output_channels: ChannelCount, /// Sample rate in Hz #[arg(short, long, value_name = "RATE", default_value_t = 48000)] - sample_rate: u32, + sample_rate: SampleRate, - /// Buffer size in frames - #[arg(short, long, value_name = "FRAMES", default_value_t = 512)] - buffer_size: u32, + /// Buffer size in frames (omit for device default) + #[arg(short, long, value_name = "FRAMES")] + buffer_size: Option, } pub fn run() -> anyhow::Result<()> { @@ -42,13 +42,15 @@ mod imp { let host = cpal::default_host(); // Find the device by device ID or use default - let device = if let Some(device_id_str) = opt.device { - let device_id = device_id_str.parse().expect("failed to parse device id"); - host.device_by_id(&device_id) - .unwrap_or_else(|| panic!("failed to find device with id: {}", device_id_str)) - } else { - host.default_output_device() - .expect("no default output device") + let device = match opt.device { + Some(device_id_str) => { + let device_id = device_id_str.parse().expect("failed to parse device id"); + host.device_by_id(&device_id) + .expect(&format!("failed to find device with id: {}", device_id_str)) + } + None => host + .default_output_device() + .expect("no default output device"), }; println!("Using device: \"{}\"", device.description()?.name()); @@ -58,7 +60,10 @@ mod imp { input_channels: opt.input_channels, output_channels: opt.output_channels, sample_rate: opt.sample_rate, - buffer_size: BufferSize::Fixed(opt.buffer_size), + buffer_size: opt + .buffer_size + .map(|s| BufferSize::Fixed(s)) + .unwrap_or(BufferSize::Default), }; println!("Building duplex stream with config: {config:?}"); @@ -66,7 +71,7 @@ mod imp { let stream = device.build_duplex_stream::( &config, move |input, output, _info| { - output.fill(0.0); + output.fill(Sample::EQUILIBRIUM); let copy_len = input.len().min(output.len()); output[..copy_len].copy_from_slice(&input[..copy_len]); }, @@ -76,7 +81,7 @@ mod imp { println!("Successfully built duplex stream."); println!( - "Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {} frames", + "Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {:?} frames", opt.input_channels, opt.output_channels, opt.sample_rate, opt.buffer_size ); @@ -86,7 +91,6 @@ mod imp { println!("Playing for 10 seconds... (speak into your microphone)"); std::thread::sleep(std::time::Duration::from_secs(10)); - drop(stream); println!("Done!"); Ok(()) } @@ -98,7 +102,6 @@ fn main() { #[cfg(not(target_os = "macos"))] { - eprintln!("Duplex streams are currently only supported on macOS."); - eprintln!("Windows (WASAPI) and Linux (ALSA) support is planned."); + eprintln!("Duplex streams are not supported on this platform."); } } diff --git a/src/duplex.rs b/src/duplex.rs index f05b9e371..6f36d9e7b 100644 --- a/src/duplex.rs +++ b/src/duplex.rs @@ -3,20 +3,9 @@ //! This module provides types for building duplex (simultaneous input/output) audio streams //! with hardware clock synchronization. //! -//! # Overview -//! -//! Unlike separate input and output streams which may have independent clocks, a duplex stream -//! uses a single device context for both input and output, ensuring they share the same -//! hardware clock. This is essential for applications like: -//! -//! - DAWs (Digital Audio Workstations) -//! - Real-time audio effects processing -//! - Audio measurement and analysis -//! - Any application requiring sample-accurate I/O synchronization -//! //! See `examples/duplex_feedback.rs` for a working example. -use crate::{InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; +use crate::{ChannelCount, InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; /// Information passed to duplex callbacks. /// @@ -66,10 +55,10 @@ impl DuplexCallbackInfo { #[derive(Clone, Debug, Eq, PartialEq)] pub struct DuplexStreamConfig { /// Number of input channels. - pub input_channels: u16, + pub input_channels: ChannelCount, /// Number of output channels. - pub output_channels: u16, + pub output_channels: ChannelCount, /// Sample rate in Hz. pub sample_rate: SampleRate, diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 0ee422445..2ce88626f 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -8,6 +8,7 @@ use crate::duplex::DuplexCallbackInfo; use crate::host::coreaudio::macos::loopback::LoopbackDevice; use crate::host::coreaudio::macos::StreamInner; use crate::traits::DeviceTrait; +use crate::StreamInstant; use crate::{ BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceId, DeviceIdError, DeviceNameError, InputCallbackInfo, @@ -56,6 +57,11 @@ use super::invoke_error_callback; use super::property_listener::AudioObjectPropertyListener; use coreaudio::audio_unit::macos_helpers::get_device_name; +/// Value for `kAudioOutputUnitProperty_EnableIO` to enable I/O on an AudioUnit element. +const AUDIO_UNIT_IO_ENABLED: u32 = 1; +/// Value for `kAudioOutputUnitProperty_EnableIO` to disable I/O on an AudioUnit element. +const AUDIO_UNIT_IO_DISABLED: u32 = 0; + /// Attempt to set the device sample rate to the provided rate. /// Return an error if the requested sample rate is not supported by the device. fn set_sample_rate( @@ -221,21 +227,19 @@ fn audio_unit_from_device(device: &Device, input: bool) -> Result( + callback_instant: StreamInstant, + delay: Duration, + error_callback: &Mutex, +) -> StreamInstant +where + E: FnMut(StreamError) + Send, +{ + callback_instant.sub(delay).unwrap_or_else(|| { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp underflow calculating capture time".into(), + }, + }, + ); + callback_instant + }) +} + +fn estimate_playback_instant( + callback_instant: StreamInstant, + delay: Duration, + error_callback: &Mutex, +) -> StreamInstant +where + E: FnMut(StreamError) + Send, +{ + callback_instant.add(delay).unwrap_or_else(|| { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp overflow calculating playback time".into(), + }, + }, + ); + callback_instant + }) +} + impl DeviceTrait for Device { type SupportedInputConfigs = SupportedInputConfigs; type SupportedOutputConfigs = SupportedOutputConfigs; @@ -818,9 +864,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = estimate_capture_instant(callback, delay, &error_callback); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -919,9 +963,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = estimate_playback_instant(callback, delay, &error_callback); let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; @@ -964,60 +1006,6 @@ impl Device { Ok(stream) } - /// Calculate latency-adjusted capture and playback timestamps with graceful error handling. - /// - /// Note: input/output streams use .expect() here and will panic - while it is - /// inconsistent, I believe it is safer for duplex to handle gracefully on the real-time - /// audio thread. Errors here should be extremely rare (timestamp arithmetic - /// overflow/underflow). - /// - /// While theoretically and probably practically impossible, this is a critical - /// user-experience issue -- the app shouldn't crash and potentially lose end-user data. - /// The only reason `sub` would overflow is if delay is massively too large, which would - /// indicate some kind of major bug or failure in the OS since callback_instant is derived - /// from host time. Still, I think the best practice is NOT to panic but tell the app - /// and fallback to a degraded latency estimate of no latency. - fn calculate_duplex_timestamps( - callback_instant: crate::StreamInstant, - delay: std::time::Duration, - error_callback: &Arc>, - ) -> (crate::StreamInstant, crate::StreamInstant) - where - E: FnMut(StreamError) + Send + 'static, - { - let capture = match callback_instant.sub(delay) { - Some(c) => c, - None => { - invoke_error_callback( - error_callback, - StreamError::BackendSpecific { - err: BackendSpecificError { - description: "Timestamp underflow calculating capture time".to_string(), - }, - }, - ); - callback_instant - } - }; - - let playback = match callback_instant.add(delay) { - Some(p) => p, - None => { - invoke_error_callback( - error_callback, - StreamError::BackendSpecific { - err: BackendSpecificError { - description: "Timestamp overflow calculating playback time".to_string(), - }, - }, - ); - callback_instant - } - }; - - (capture, playback) - } - /// Build a duplex stream with synchronized input and output. /// /// This creates a single HAL AudioUnit with both input and output enabled, @@ -1035,7 +1023,7 @@ impl Device { E: FnMut(StreamError) + Send + 'static, { // Validate that device supports duplex - if !self.supports_input() || !self.supports_output() { + if !self.supports_duplex() { return Err(BuildStreamError::StreamConfigNotSupported); } @@ -1046,14 +1034,12 @@ impl Device { let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; // Enable BOTH input and output on the AudioUnit - let enable: u32 = 1; - // Enable input on Element 1 audio_unit.set_property( kAudioOutputUnitProperty_EnableIO, Scope::Input, Element::Input, - Some(&enable), + Some(&AUDIO_UNIT_IO_ENABLED), )?; // Enable output on Element 0 (usually enabled by default, but be explicit) @@ -1061,7 +1047,7 @@ impl Device { kAudioOutputUnitProperty_EnableIO, Scope::Output, Element::Output, - Some(&enable), + Some(&AUDIO_UNIT_IO_ENABLED), )?; // Set device for the unit (applies to both input and output) @@ -1074,14 +1060,14 @@ impl Device { // Create StreamConfig for input side let input_stream_config = StreamConfig { - channels: config.input_channels as ChannelCount, + channels: config.input_channels, sample_rate: config.sample_rate, buffer_size: config.buffer_size, }; // Create StreamConfig for output side let output_stream_config = StreamConfig { - channels: config.output_channels as ChannelCount, + channels: config.output_channels, sample_rate: config.sample_rate, buffer_size: config.buffer_size, }; @@ -1149,20 +1135,10 @@ impl Device { // changes // Recovery requires the user to destroy and recreate the stream, which will query // the new buffer size range and allocate appropriately. - let buffer_size_range = get_io_buffer_frame_size_range(&audio_unit)?; - let max_buffer_size = match buffer_size_range { + let max_buffer_size = match get_io_buffer_frame_size_range(&audio_unit)? { crate::SupportedBufferSize::Range { max, .. } => max, crate::SupportedBufferSize::Unknown => { - // Fallback: query current size and add headroom for safety - let current: u32 = audio_unit - .get_property( - kAudioDevicePropertyBufferFrameSize, - Scope::Global, - Element::Output, - ) - .unwrap_or(512); - // Add 2x headroom when range is unknown to handle potential size increases - current * 2 + return Err(BuildStreamError::StreamConfigNotSupported); } }; @@ -1272,7 +1248,9 @@ impl Device { ); let delay = frames_to_duration(latency_frames, sample_rate); - let (capture, playback) = Self::calculate_duplex_timestamps( + let capture = + estimate_capture_instant(callback_instant, delay, &error_callback_for_callback); + let playback = estimate_playback_instant( callback_instant, delay, &error_callback_for_callback, @@ -1557,11 +1535,8 @@ unsafe impl Send for DuplexProcWrapper {} /// /// Note: `extern "C-unwind"` is required here because `AURenderCallbackStruct` /// from coreaudio-sys types the `inputProc` field as `extern "C-unwind"`. -/// Ideally this would be `extern "C"` so that a panic aborts rather than -/// unwinding through CoreAudio's C frames (this callback runs on CoreAudio's -/// audio thread with no Rust frames above to catch an unwind). Per RFC 2945 -/// (https://github.com/rust-lang/rust/issues/115285), `extern "C"` aborts on -/// panic, which would be the correct behavior here. +/// We use `catch_unwind` to prevent panics from unwinding through CoreAudio's +/// C frames, which would be undefined behavior. extern "C-unwind" fn duplex_input_proc( in_ref_con: NonNull, io_action_flags: NonNull, @@ -1577,11 +1552,16 @@ extern "C-unwind" fn duplex_input_proc( // guarantees single-threaded callback invocation — this function is never // called concurrently, so only one `&mut` to the wrapper exists at a time. let wrapper = unsafe { in_ref_con.cast::().as_mut() }; - (wrapper.callback)( - io_action_flags, - in_time_stamp, - in_bus_number, - in_number_frames, - io_data, - ) + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + (wrapper.callback)( + io_action_flags, + in_time_stamp, + in_bus_number, + in_number_frames, + io_data, + ) + })) { + Ok(result) => result, + Err(_) => kAudio_ParamError, + } } diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index 6f617ee8b..0a313814a 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -61,7 +61,7 @@ type ErrorCallback = Box; /// Invoke error callback, recovering from poisoned mutex if needed. /// Returns true if callback was invoked, false if skipped due to WouldBlock. #[inline] -fn invoke_error_callback(error_callback: &Arc>, err: crate::StreamError) -> bool +fn invoke_error_callback(error_callback: &Mutex, err: crate::StreamError) -> bool where E: FnMut(crate::StreamError) + Send, { diff --git a/src/traits.rs b/src/traits.rs index 703138a0a..fc2c9650c 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -301,8 +301,6 @@ pub trait DeviceTrait { /// Create a duplex stream with synchronized input and output sharing the same hardware clock. /// - /// Essential for DAWs, real-time effects, and any application requiring sample-accurate I/O sync. - /// /// Check [`supports_duplex`](Self::supports_duplex) before calling. See the example below for usage. fn build_duplex_stream( &self, From 73179a0638e4fc60b2433153ffc847513502d49e Mon Sep 17 00:00:00 2001 From: Mark Gulbrandsen Date: Sun, 22 Mar 2026 18:06:12 -0700 Subject: [PATCH 3/4] adress feedback (I believe that this captures all changes requested) --- examples/duplex_feedback.rs | 10 +- src/duplex.rs | 34 +- src/host/coreaudio/macos/device.rs | 508 ++--------------------------- src/host/coreaudio/macos/duplex.rs | 429 ++++++++++++++++++++++++ src/host/coreaudio/macos/mod.rs | 110 +++++-- 5 files changed, 533 insertions(+), 558 deletions(-) create mode 100644 src/host/coreaudio/macos/duplex.rs diff --git a/examples/duplex_feedback.rs b/examples/duplex_feedback.rs index 49bf4c19d..1633caa95 100644 --- a/examples/duplex_feedback.rs +++ b/examples/duplex_feedback.rs @@ -1,10 +1,4 @@ -//! Feeds back the input stream directly into the output stream using a duplex stream. -//! -//! Unlike the `feedback.rs` example which uses separate input/output streams with a ring buffer, -//! duplex streams provide hardware-synchronized input/output without additional buffering. -//! -//! Note: Currently only supported on macOS (CoreAudio). Windows (WASAPI) and Linux (ALSA) -//! implementations are planned. +// Duplex feedback example: feeds the input stream directly into the output. #[cfg(target_os = "macos")] mod imp { @@ -41,7 +35,6 @@ mod imp { let opt = Opt::parse(); let host = cpal::default_host(); - // Find the device by device ID or use default let device = match opt.device { Some(device_id_str) => { let device_id = device_id_str.parse().expect("failed to parse device id"); @@ -55,7 +48,6 @@ mod imp { println!("Using device: \"{}\"", device.description()?.name()); - // Create duplex stream configuration. let config = DuplexStreamConfig { input_channels: opt.input_channels, output_channels: opt.output_channels, diff --git a/src/duplex.rs b/src/duplex.rs index 6f36d9e7b..25f39ab36 100644 --- a/src/duplex.rs +++ b/src/duplex.rs @@ -1,17 +1,6 @@ -//! Duplex audio stream support with synchronized input/output. -//! -//! This module provides types for building duplex (simultaneous input/output) audio streams -//! with hardware clock synchronization. -//! -//! See `examples/duplex_feedback.rs` for a working example. - use crate::{ChannelCount, InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; -/// Information passed to duplex callbacks. -/// -/// This contains timing information for the current audio buffer, combining -/// both input and output timing. A duplex stream has a single callback invocation -/// that provides synchronized input and output data. +// Timing information for a duplex callback, combining input and output timestamps. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct DuplexCallbackInfo { input_timestamp: InputStreamTimestamp, @@ -19,10 +8,6 @@ pub struct DuplexCallbackInfo { } impl DuplexCallbackInfo { - /// Create a new DuplexCallbackInfo. - /// - /// Note: Both timestamps will share the same `callback` instant since there is - /// only one callback invocation for a duplex stream. pub fn new( input_timestamp: InputStreamTimestamp, output_timestamp: OutputStreamTimestamp, @@ -33,36 +18,19 @@ impl DuplexCallbackInfo { } } - /// The timestamp for the input portion of the duplex stream. - /// - /// Contains the callback instant and when the input data was captured. pub fn input_timestamp(&self) -> InputStreamTimestamp { self.input_timestamp } - /// The timestamp for the output portion of the duplex stream. - /// - /// Contains the callback instant and when the output data will be played. pub fn output_timestamp(&self) -> OutputStreamTimestamp { self.output_timestamp } } -/// Configuration for a duplex audio stream. -/// -/// Unlike separate input/output streams, duplex streams require matching -/// configuration for both directions since they share a single device context. #[derive(Clone, Debug, Eq, PartialEq)] pub struct DuplexStreamConfig { - /// Number of input channels. pub input_channels: ChannelCount, - - /// Number of output channels. pub output_channels: ChannelCount, - - /// Sample rate in Hz. pub sample_rate: SampleRate, - - /// Requested buffer size in frames. pub buffer_size: crate::BufferSize, } diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 2ce88626f..40bb08193 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -1,9 +1,6 @@ use super::OSStatus; use super::Stream; -use super::{ - asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant, - DuplexCallbackPtr, -}; +use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant}; use crate::duplex::DuplexCallbackInfo; use crate::host::coreaudio::macos::loopback::LoopbackDevice; use crate::host::coreaudio::macos::StreamInner; @@ -19,9 +16,7 @@ use coreaudio::audio_unit::render_callback::{self, data}; use coreaudio::audio_unit::{AudioUnit, Element, Scope}; use objc2_audio_toolbox::{ kAudioOutputUnitProperty_CurrentDevice, kAudioOutputUnitProperty_EnableIO, - kAudioUnitErr_TooManyFramesToProcess, kAudioUnitProperty_SetRenderCallback, - kAudioUnitProperty_StreamFormat, AURenderCallbackStruct, AudioUnitRender, - AudioUnitRenderActionFlags, + kAudioUnitProperty_StreamFormat, }; use objc2_core_audio::kAudioDevicePropertyDeviceUID; use objc2_core_audio::kAudioObjectPropertyElementMain; @@ -37,11 +32,11 @@ use objc2_core_audio::{ AudioObjectPropertyScope, AudioObjectSetPropertyData, }; use objc2_core_audio_types::{ - kAudio_ParamError, AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioTimeStamp, - AudioValueRange, + AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange, }; use objc2_core_foundation::CFString; use objc2_core_foundation::Type; +use std::mem::ManuallyDrop; pub use super::enumerate::{ default_input_device, default_output_device, SupportedInputConfigs, SupportedOutputConfigs, @@ -57,14 +52,13 @@ use super::invoke_error_callback; use super::property_listener::AudioObjectPropertyListener; use coreaudio::audio_unit::macos_helpers::get_device_name; -/// Value for `kAudioOutputUnitProperty_EnableIO` to enable I/O on an AudioUnit element. -const AUDIO_UNIT_IO_ENABLED: u32 = 1; +pub(super) const AUDIO_UNIT_IO_ENABLED: u32 = 1; /// Value for `kAudioOutputUnitProperty_EnableIO` to disable I/O on an AudioUnit element. const AUDIO_UNIT_IO_DISABLED: u32 = 0; /// Attempt to set the device sample rate to the provided rate. /// Return an error if the requested sample rate is not supported by the device. -fn set_sample_rate( +pub(super) fn set_sample_rate( audio_device_id: AudioObjectID, target_sample_rate: SampleRate, ) -> Result<(), BuildStreamError> { @@ -271,7 +265,7 @@ fn get_io_buffer_frame_size_range( }) } -fn estimate_capture_instant( +pub(super) fn estimate_capture_instant( callback_instant: StreamInstant, delay: Duration, error_callback: &Mutex, @@ -292,7 +286,7 @@ where }) } -fn estimate_playback_instant( +pub(super) fn estimate_playback_instant( callback_instant: StreamInstant, delay: Duration, error_callback: &Mutex, @@ -402,7 +396,14 @@ impl DeviceTrait for Device { D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - Device::build_duplex_stream_raw(self, config, sample_format, data_callback, error_callback) + Device::build_duplex_stream_raw( + self, + config, + sample_format, + data_callback, + error_callback, + _timeout, + ) } } @@ -885,12 +886,13 @@ impl Device { let stream = Stream::new( StreamInner { playing: true, - audio_unit, + audio_unit: ManuallyDrop::new(audio_unit), device_id: self.audio_device_id, _loopback_device: loopback_aggregate, duplex_callback_ptr: None, }, error_callback_for_stream, + false, )?; stream @@ -984,12 +986,13 @@ impl Device { let stream = Stream::new( StreamInner { playing: true, - audio_unit, + audio_unit: ManuallyDrop::new(audio_unit), device_id: self.audio_device_id, _loopback_device: None, duplex_callback_ptr: None, }, error_callback_for_stream, + false, )?; stream @@ -1005,400 +1008,6 @@ impl Device { Ok(stream) } - - /// Build a duplex stream with synchronized input and output. - /// - /// This creates a single HAL AudioUnit with both input and output enabled, - /// ensuring they share the same hardware clock. - /// For details, see: https://developer.apple.com/library/archive/technotes/tn2091/_index.html - fn build_duplex_stream_raw( - &self, - config: &crate::duplex::DuplexStreamConfig, - sample_format: SampleFormat, - data_callback: D, - error_callback: E, - ) -> Result - where - D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, - E: FnMut(StreamError) + Send + 'static, - { - // Validate that device supports duplex - if !self.supports_duplex() { - return Err(BuildStreamError::StreamConfigNotSupported); - } - - // Potentially change the device sample rate to match the config. - set_sample_rate(self.audio_device_id, config.sample_rate)?; - - // Create HAL AudioUnit - always use HalOutput for duplex - let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; - - // Enable BOTH input and output on the AudioUnit - // Enable input on Element 1 - audio_unit.set_property( - kAudioOutputUnitProperty_EnableIO, - Scope::Input, - Element::Input, - Some(&AUDIO_UNIT_IO_ENABLED), - )?; - - // Enable output on Element 0 (usually enabled by default, but be explicit) - audio_unit.set_property( - kAudioOutputUnitProperty_EnableIO, - Scope::Output, - Element::Output, - Some(&AUDIO_UNIT_IO_ENABLED), - )?; - - // Set device for the unit (applies to both input and output) - audio_unit.set_property( - kAudioOutputUnitProperty_CurrentDevice, - Scope::Global, - Element::Output, - Some(&self.audio_device_id), - )?; - - // Create StreamConfig for input side - let input_stream_config = StreamConfig { - channels: config.input_channels, - sample_rate: config.sample_rate, - buffer_size: config.buffer_size, - }; - - // Create StreamConfig for output side - let output_stream_config = StreamConfig { - channels: config.output_channels, - sample_rate: config.sample_rate, - buffer_size: config.buffer_size, - }; - - // Core Audio's HAL AU has two buses, each with a hardware side and a - // client (app) side. We set the stream format on the client-facing side; - // the AU's built-in converter handles translation to/from the hardware format. - // - // Mic ─[Scope::Input]──▶ Input Bus ──[Scope::Output]─▶ App - // (hardware side) (client side) - // - // App ─[Scope::Input]──▶ Output Bus ─[Scope::Output]─▶ Speaker - // (client side) (hardware side) - // - // So the client side is Scope::Output for the input bus (where we read - // captured samples) and Scope::Input for the output bus (where we write - // playback samples). See Apple TN2091 for details. - let input_asbd = asbd_from_config(&input_stream_config, sample_format); - audio_unit.set_property( - kAudioUnitProperty_StreamFormat, - Scope::Output, - Element::Input, - Some(&input_asbd), - )?; - - let output_asbd = asbd_from_config(&output_stream_config, sample_format); - audio_unit.set_property( - kAudioUnitProperty_StreamFormat, - Scope::Input, - Element::Output, - Some(&output_asbd), - )?; - - // Buffer frame size is a device-level property. Element::Output (0) is - // the standard convention for Scope::Global properties. - if let BufferSize::Fixed(buffer_size) = &config.buffer_size { - audio_unit.set_property( - kAudioDevicePropertyBufferFrameSize, - Scope::Global, - Element::Output, - Some(buffer_size), - )?; - } - - // Query the buffer size range to determine the maximum possible buffer size. - // - // BUFFER ALLOCATION STRATEGY: - // Duplex streams must pre-allocate an input buffer because AudioUnitRender() requires - // us to provide the buffer. Unlike input/output streams where CoreAudio provides the - // buffer, we cannot dynamically reallocate in the callback without: - // 1. Violating real-time constraints (allocation is not RT-safe) - // 2. Implementing complex double/triple buffering (not worth it for rare edge case) - // - // Therefore, we allocate for the MAXIMUM buffer size reported by the device. This - // handles dynamic buffer size changes (e.g., due to sample rate changes or system - // audio settings) transparently and efficiently. - // - // LIMITATION: - // If CoreAudio requests more frames than the device's reported maximum buffer size, - // the stream will fail with kAudioUnitErr_TooManyFramesToProcess. This can happen if: - // - The device's buffer size is changed after stream creation (via System Settings - // or another audio application) - // - The device reconfigures itself (e.g., joins/leaves an aggregate device) - // - The device's supported buffer size range changes due to sample rate or format - // changes - // Recovery requires the user to destroy and recreate the stream, which will query - // the new buffer size range and allocate appropriately. - let max_buffer_size = match get_io_buffer_frame_size_range(&audio_unit)? { - crate::SupportedBufferSize::Range { max, .. } => max, - crate::SupportedBufferSize::Unknown => { - return Err(BuildStreamError::StreamConfigNotSupported); - } - }; - - // Get callback vars for latency calculation (matching input/output pattern) - let sample_rate = config.sample_rate; - let device_buffer_frames = get_device_buffer_frame_size(&audio_unit).ok(); - - // Get the raw AudioUnit pointer for use in the callback - let raw_audio_unit = *audio_unit.as_ref(); - - // Configuration for callback - let input_channels = config.input_channels as usize; - let sample_bytes = sample_format.sample_size(); - - // Pre-allocate input buffer for the maximum buffer size (in bytes). - // This ensures we can handle dynamic buffer size changes without reallocation. - // See the BUFFER ALLOCATION STRATEGY comment above for detailed rationale. - let input_buffer_samples = max_buffer_size as usize * input_channels; - let input_buffer_bytes = input_buffer_samples * sample_bytes; - let mut input_buffer: Vec = vec![0u8; input_buffer_bytes]; - - // Wrap error callback in Arc for sharing between callback and disconnect handler - let error_callback = Arc::new(Mutex::new(error_callback)); - let error_callback_for_callback = error_callback.clone(); - - // Move data callback into closure - let mut data_callback = data_callback; - - let duplex_proc: Box = Box::new( - move |io_action_flags: NonNull, - in_time_stamp: NonNull, - _in_bus_number: u32, - in_number_frames: u32, - io_data: *mut AudioBufferList| - -> i32 { - // Validate output buffer structure early to avoid wasted work - if io_data.is_null() { - return kAudio_ParamError; - } - // SAFETY: io_data validated as non-null above - let buffer_list = unsafe { &mut *io_data }; - if buffer_list.mNumberBuffers == 0 { - return kAudio_ParamError; - } - - let num_frames = in_number_frames as usize; - let input_samples = num_frames * input_channels; - let input_bytes = input_samples * sample_bytes; - - // Bounds check: ensure the requested frames don't exceed our pre-allocated buffer. - // This buffer is allocated for the maximum buffer size reported by the device, - // so this check should only fail if CoreAudio requests more frames than the - // device's reported maximum (which would indicate a serious driver/OS issue). - if input_bytes > input_buffer.len() { - invoke_error_callback( - &error_callback_for_callback, - StreamError::BackendSpecific { - err: BackendSpecificError { - description: format!( - "CoreAudio requested {} frames ({} bytes) but the input buffer is \ - only {} bytes (allocated for the device's maximum buffer size at \ - stream creation). This likely means the device's buffer size or \ - configuration changed after the stream was created. To recover, \ - destroy and recreate the stream.", - num_frames, input_bytes, input_buffer.len() - ), - }, - }, - ); - // Return error - this is a persistent failure that will happen every callback - // until the user destroys and recreates the stream - return kAudioUnitErr_TooManyFramesToProcess; - } - - // SAFETY: in_time_stamp is valid per CoreAudio contract - let timestamp: &AudioTimeStamp = unsafe { in_time_stamp.as_ref() }; - - // Create StreamInstant for callback_instant - let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { - Err(err) => { - invoke_error_callback(&error_callback_for_callback, err.into()); - // Return 0 (noErr) to keep the stream alive while notifying the error - // callback. This matches input/output stream behavior and allows graceful - // degradation rather than stopping the stream on transient errors. - return 0; - } - Ok(cb) => cb, - }; - - let buffer = &mut buffer_list.mBuffers[0]; - if buffer.mData.is_null() { - return kAudio_ParamError; - } - let output_samples = buffer.mDataByteSize as usize / sample_bytes; - - // SAFETY: buffer.mData validated as non-null above - let mut output_data = unsafe { - Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) - }; - - // Calculate latency-adjusted timestamps (matching input/output pattern) - let buffer_frames = num_frames; - // Use device buffer size for latency calculation if available - let latency_frames = device_buffer_frames.unwrap_or( - // Fallback to callback buffer size if device buffer size is unknown - buffer_frames, - ); - let delay = frames_to_duration(latency_frames, sample_rate); - - let capture = - estimate_capture_instant(callback_instant, delay, &error_callback_for_callback); - let playback = estimate_playback_instant( - callback_instant, - delay, - &error_callback_for_callback, - ); - - // Create callback info with latency-adjusted times - let input_timestamp = crate::InputStreamTimestamp { - callback: callback_instant, - capture, - }; - let output_timestamp = crate::OutputStreamTimestamp { - callback: callback_instant, - playback, - }; - - // Pull input from Element 1 using AudioUnitRender - // use the pre-allocated input_buffer - // Set up AudioBufferList pointing to our input buffer - let mut input_buffer_list = AudioBufferList { - mNumberBuffers: 1, - mBuffers: [AudioBuffer { - mNumberChannels: input_channels as u32, - mDataByteSize: input_bytes as u32, - mData: input_buffer.as_mut_ptr() as *mut std::ffi::c_void, - }], - }; - - // SAFETY: AudioUnitRender is called with valid parameters: - // - raw_audio_unit is valid for the callback duration - // - input_buffer_list is created just above on the stack with a pointer to - // input_buffer, which is properly aligned (Rust's standard allocators return - // pointers aligned to at least 8 bytes, exceeding f32/i16 requirements) - // - input_buffer has been bounds-checked above to ensure sufficient capacity - // - All other parameters (timestamps, flags, etc.) come from CoreAudio itself - let status = unsafe { - AudioUnitRender( - raw_audio_unit, - io_action_flags.as_ptr(), - in_time_stamp, - 1, // Element 1 = input - in_number_frames, - NonNull::new_unchecked(&mut input_buffer_list), - ) - }; - - if status != 0 { - // Report error but continue with silence for graceful degradation - // The application should decide what to do. - invoke_error_callback( - &error_callback_for_callback, - StreamError::BackendSpecific { - err: BackendSpecificError { - description: format!( - "AudioUnitRender failed for input: OSStatus {}", - status - ), - }, - }, - ); - input_buffer[..input_bytes].fill(0); - } - - // SAFETY: Creating Data from input_buffer is safe because: - // - input_buffer is a valid Vec owned by this closure - // - input_samples (num_frames * input_channels) was bounds-checked above to ensure - // input_samples * sample_bytes <= input_buffer.len() - // - AudioUnitRender just filled the buffer with valid audio data (or we filled - // it with silence on error) - // - The Data lifetime is scoped to this callback and doesn't outlive input_buffer - // - The pointer is suitably aligned: We successfully passed this buffer to - // AudioUnitRender (line 1314), which requires properly aligned buffers and would - // have failed if alignment were incorrect. Additionally, in practice, Rust's - // standard allocators (System, jemalloc, etc.) return pointers aligned to at - // least 8 bytes, which exceeds the requirements for f32 (4 bytes) and i16 (2 bytes). - let input_data = unsafe { - Data::from_parts( - input_buffer.as_mut_ptr() as *mut (), - input_samples, - sample_format, - ) - }; - - let callback_info = DuplexCallbackInfo::new(input_timestamp, output_timestamp); - data_callback(&input_data, &mut output_data, &callback_info); - - // Return 0 (noErr) to indicate successful render - 0 - }, - ); - - // Box the wrapper and get raw pointer for CoreAudio - let wrapper = Box::new(DuplexProcWrapper { - callback: duplex_proc, - }); - let wrapper_ptr = Box::into_raw(wrapper); - - // Set up the render callback - let render_callback = AURenderCallbackStruct { - inputProc: Some(duplex_input_proc), - inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, - }; - - audio_unit.set_property( - kAudioUnitProperty_SetRenderCallback, - Scope::Global, - Element::Output, - Some(&render_callback), - )?; - - // Create the stream inner, storing the callback pointer for cleanup - let inner = StreamInner { - playing: true, - audio_unit, - device_id: self.audio_device_id, - _loopback_device: None, - duplex_callback_ptr: Some(DuplexCallbackPtr(wrapper_ptr)), - }; - - // Create error callback for stream - either dummy or real based on device type. - // For duplex, only swallow disconnect if the device is the default for both - // roles — otherwise Core Audio won't re-route both directions. - let error_callback_for_stream: super::ErrorCallback = - if is_default_input_device(self) && is_default_output_device(self) { - Box::new(|_: StreamError| {}) - } else { - let error_callback_clone = error_callback.clone(); - Box::new(move |err: StreamError| { - invoke_error_callback(&error_callback_clone, err); - }) - }; - - // Create the duplex stream - let stream = Stream::new(inner, error_callback_for_stream)?; - - // Start the audio unit - stream - .inner - .lock() - .map_err(|_| BuildStreamError::BackendSpecific { - err: BackendSpecificError { - description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), - }, - })? - .audio_unit - .start()?; - - Ok(stream) - } } /// Configure stream format and buffer size for CoreAudio stream. @@ -1478,7 +1087,7 @@ fn setup_callback_vars( /// /// Buffer frame size is a device-level property that always uses Scope::Global + Element::Output, /// regardless of whether the audio unit is configured for input or output streams. -pub(crate) fn get_device_buffer_frame_size( +pub(super) fn get_device_buffer_frame_size( audio_unit: &AudioUnit, ) -> Result { // Device-level property: always use Scope::Global + Element::Output @@ -1490,78 +1099,3 @@ pub(crate) fn get_device_buffer_frame_size( )?; Ok(frames as usize) } - -// ============================================================================ -// Duplex callback infrastructure -// ============================================================================ - -use std::ffi::c_void; - -/// Type alias for the duplex callback closure. -/// -/// This is the raw callback signature that CoreAudio expects. The closure -/// receives the same parameters as a C callback and returns an OSStatus. -type DuplexProcFn = dyn FnMut( - NonNull, - NonNull, - u32, // bus_number - u32, // num_frames - *mut AudioBufferList, -) -> i32; - -/// Wrapper for the boxed duplex callback closure. -/// -/// This struct is allocated on the heap and its pointer is passed to CoreAudio -/// as the refcon. The extern "C" callback function casts the refcon back to -/// this type and calls the closure. -pub(crate) struct DuplexProcWrapper { - callback: Box, -} - -// SAFETY: DuplexProcWrapper is Send because: -// 1. The boxed closure captures only Send types (the DuplexCallback trait requires Send) -// 2. The raw pointer stored in StreamInner is accessed: -// - By CoreAudio's audio thread via `duplex_input_proc` (as the refcon) -// - During Drop, after stopping the audio unit (callback no longer running) -// These never overlap: Drop stops the audio unit before reclaiming the pointer. -// 3. CoreAudio guarantees single-threaded callback invocation -unsafe impl Send for DuplexProcWrapper {} - -/// CoreAudio render callback for duplex audio. -/// -/// This is a thin wrapper that casts the refcon back to our DuplexProcWrapper -/// and calls the inner closure. The closure owns all the callback state via -/// move semantics, so no Mutex is needed. -/// -/// Note: `extern "C-unwind"` is required here because `AURenderCallbackStruct` -/// from coreaudio-sys types the `inputProc` field as `extern "C-unwind"`. -/// We use `catch_unwind` to prevent panics from unwinding through CoreAudio's -/// C frames, which would be undefined behavior. -extern "C-unwind" fn duplex_input_proc( - in_ref_con: NonNull, - io_action_flags: NonNull, - in_time_stamp: NonNull, - in_bus_number: u32, - in_number_frames: u32, - io_data: *mut AudioBufferList, -) -> i32 { - // SAFETY: `in_ref_con` points to a heap-allocated `DuplexProcWrapper` created - // via `Box::into_raw` in `build_duplex_stream`, and remains valid for the - // lifetime of the audio unit (reclaimed in `StreamInner::drop`). The `as_mut()` call - // produces an exclusive `&mut` reference, which is sound because CoreAudio - // guarantees single-threaded callback invocation — this function is never - // called concurrently, so only one `&mut` to the wrapper exists at a time. - let wrapper = unsafe { in_ref_con.cast::().as_mut() }; - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - (wrapper.callback)( - io_action_flags, - in_time_stamp, - in_bus_number, - in_number_frames, - io_data, - ) - })) { - Ok(result) => result, - Err(_) => kAudio_ParamError, - } -} diff --git a/src/host/coreaudio/macos/duplex.rs b/src/host/coreaudio/macos/duplex.rs new file mode 100644 index 000000000..864ce75b3 --- /dev/null +++ b/src/host/coreaudio/macos/duplex.rs @@ -0,0 +1,429 @@ +use super::{ + asbd_from_config, frames_to_duration, host_time_to_stream_instant, invoke_error_callback, + DuplexCallbackPtr, Stream, StreamInner, +}; +use crate::duplex::DuplexCallbackInfo; +use crate::{ + BackendSpecificError, BufferSize, BuildStreamError, Data, SampleFormat, StreamConfig, + StreamError, +}; +use coreaudio::audio_unit::{AudioUnit, Element, Scope}; +use objc2_audio_toolbox::{ + kAudioOutputUnitProperty_CurrentDevice, kAudioOutputUnitProperty_EnableIO, + kAudioUnitProperty_SetRenderCallback, kAudioUnitProperty_StreamFormat, AURenderCallbackStruct, + AudioUnitRender, AudioUnitRenderActionFlags, +}; +use objc2_core_audio::kAudioDevicePropertyBufferFrameSize; +use objc2_core_audio_types::{kAudio_ParamError, AudioBuffer, AudioBufferList, AudioTimeStamp}; +use std::ffi::c_void; +use std::mem::ManuallyDrop; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex}; + +use super::device::{ + estimate_capture_instant, estimate_playback_instant, get_device_buffer_frame_size, + set_sample_rate, Device, AUDIO_UNIT_IO_ENABLED, +}; +use crate::traits::DeviceTrait; + +type DuplexProcFn = dyn FnMut( + NonNull, + NonNull, + u32, // bus_number + u32, // num_frames + *mut AudioBufferList, +) -> i32; + +/// Wrapper for the boxed duplex callback closure. +/// +/// This struct is allocated on the heap and its pointer is passed to CoreAudio +/// as the refcon. The extern "C" callback function casts the refcon back to +/// this type and calls the closure. +pub(crate) struct DuplexProcWrapper { + callback: Box, +} + +// SAFETY: DuplexProcWrapper is Send because: +// 1. The boxed closure captures only Send types (the DuplexCallback trait requires Send) +// 2. The raw pointer stored in StreamInner is accessed: +// - By CoreAudio's audio thread via `duplex_input_proc` (as the refcon) +// - During Drop, after stopping the audio unit (callback no longer running) +// These never overlap: Drop stops the audio unit before reclaiming the pointer. +// 3. CoreAudio guarantees single-threaded callback invocation +unsafe impl Send for DuplexProcWrapper {} + +/// CoreAudio render callback for duplex audio. +/// +/// This is a thin wrapper that casts the refcon back to our DuplexProcWrapper +/// and calls the inner closure. The closure owns all the callback state via +/// move semantics, so no Mutex is needed. +/// +/// Note: `extern "C-unwind"` is required here because `AURenderCallbackStruct` +/// from coreaudio-sys types the `inputProc` field as `extern "C-unwind"`. +/// We use `catch_unwind` to prevent panics from unwinding through CoreAudio's +/// C frames, which would be undefined behavior. +extern "C-unwind" fn duplex_input_proc( + in_ref_con: NonNull, + io_action_flags: NonNull, + in_time_stamp: NonNull, + in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList, +) -> i32 { + // SAFETY: `in_ref_con` points to a heap-allocated `DuplexProcWrapper` created + // via `Box::into_raw` in `build_duplex_stream`, and remains valid for the + // lifetime of the audio unit (reclaimed in `StreamInner::drop`). The `as_mut()` call + // produces an exclusive `&mut` reference, which is sound because CoreAudio + // guarantees single-threaded callback invocation — this function is never + // called concurrently, so only one `&mut` to the wrapper exists at a time. + let wrapper = unsafe { in_ref_con.cast::().as_mut() }; + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + (wrapper.callback)( + io_action_flags, + in_time_stamp, + in_bus_number, + in_number_frames, + io_data, + ) + })) { + Ok(result) => result, + Err(_) => kAudio_ParamError, + } +} + +impl Device { + /// Build a duplex stream with synchronized input and output. + /// + /// This creates a single HAL AudioUnit with both input and output enabled, + /// ensuring they share the same hardware clock. + /// For details, see: https://developer.apple.com/library/archive/technotes/tn2091/_index.html + pub(crate) fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // Validate that device supports duplex + if !self.supports_duplex() { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + // Potentially change the device sample rate to match the config. + set_sample_rate(self.audio_device_id, config.sample_rate)?; + + // Create HAL AudioUnit - always use HalOutput for duplex + let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; + + // Enable BOTH input and output on the AudioUnit + // Enable input on Element 1 + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Input, + Element::Input, + Some(&AUDIO_UNIT_IO_ENABLED), + )?; + + // Enable output on Element 0 (usually enabled by default, but be explicit) + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Output, + Element::Output, + Some(&AUDIO_UNIT_IO_ENABLED), + )?; + + // Set device for the unit (applies to both input and output) + audio_unit.set_property( + kAudioOutputUnitProperty_CurrentDevice, + Scope::Global, + Element::Output, + Some(&self.audio_device_id), + )?; + + // Create StreamConfig for input side + let input_stream_config = StreamConfig { + channels: config.input_channels, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Create StreamConfig for output side + let output_stream_config = StreamConfig { + channels: config.output_channels, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Core Audio's HAL AU has two buses, each with a hardware side and a + // client (app) side. We set the stream format on the client-facing side; + // the AU's built-in converter handles translation to/from the hardware format. + // + // Mic ─[Scope::Input]──▶ Input Bus ──[Scope::Output]─▶ App + // (hardware side) (client side) + // + // App ─[Scope::Input]──▶ Output Bus ─[Scope::Output]─▶ Speaker + // (client side) (hardware side) + // + // So the client side is Scope::Output for the input bus (where we read + // captured samples) and Scope::Input for the output bus (where we write + // playback samples). See Apple TN2091 for details. + let input_asbd = asbd_from_config(&input_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Output, + Element::Input, + Some(&input_asbd), + )?; + + let output_asbd = asbd_from_config(&output_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Input, + Element::Output, + Some(&output_asbd), + )?; + + // Buffer frame size is a device-level property. Element::Output (0) is + // the standard convention for Scope::Global properties. + if let BufferSize::Fixed(buffer_size) = &config.buffer_size { + audio_unit.set_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + Some(buffer_size), + )?; + } + + // Allocate input buffer for the current device buffer size. + let current_buffer_size = get_device_buffer_frame_size(&audio_unit).map_err(|e| { + BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: e.to_string(), + }, + } + })?; + + let sample_rate = config.sample_rate; + let device_buffer_frames = current_buffer_size; + let raw_audio_unit = *audio_unit.as_ref(); + let input_channels = config.input_channels as usize; + let sample_bytes = sample_format.sample_size(); + + let input_buffer_bytes = current_buffer_size * input_channels * sample_bytes; + let mut input_buffer: Box<[u8]> = vec![0u8; input_buffer_bytes].into_boxed_slice(); + + // Wrap error callback in Arc for sharing between callback and disconnect handler + let error_callback = Arc::new(Mutex::new(error_callback)); + let error_callback_for_callback = error_callback.clone(); + + let mut data_callback = data_callback; + let buffer_size_changed = std::sync::atomic::AtomicBool::new(false); + + let duplex_proc: Box = Box::new( + move |io_action_flags: NonNull, + in_time_stamp: NonNull, + _in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList| + -> i32 { + if buffer_size_changed.load(std::sync::atomic::Ordering::Relaxed) { + return kAudio_ParamError; + } + + if io_data.is_null() { + return kAudio_ParamError; + } + // SAFETY: io_data validated as non-null above + let buffer_list = unsafe { &mut *io_data }; + if buffer_list.mNumberBuffers == 0 { + return kAudio_ParamError; + } + + let num_frames = in_number_frames as usize; + let input_samples = num_frames * input_channels; + let input_bytes = input_samples * sample_bytes; + + if input_bytes != input_buffer.len() { + buffer_size_changed.store(true, std::sync::atomic::Ordering::Relaxed); + return kAudio_ParamError; + } + + // SAFETY: in_time_stamp is valid per CoreAudio contract + let timestamp: &AudioTimeStamp = unsafe { in_time_stamp.as_ref() }; + + // Create StreamInstant for callback_instant + let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { + Err(err) => { + invoke_error_callback(&error_callback_for_callback, err.into()); + // Return 0 (noErr) to keep the stream alive while notifying the error + // callback. This matches input/output stream behavior and allows graceful + // degradation rather than stopping the stream on transient errors. + return 0; + } + Ok(cb) => cb, + }; + + let buffer = &mut buffer_list.mBuffers[0]; + if buffer.mData.is_null() { + return kAudio_ParamError; + } + let output_samples = buffer.mDataByteSize as usize / sample_bytes; + + // SAFETY: buffer.mData validated as non-null above + let mut output_data = unsafe { + Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) + }; + + let delay = frames_to_duration(device_buffer_frames, sample_rate); + + let capture = + estimate_capture_instant(callback_instant, delay, &error_callback_for_callback); + let playback = estimate_playback_instant( + callback_instant, + delay, + &error_callback_for_callback, + ); + + // Create callback info with latency-adjusted times + let input_timestamp = crate::InputStreamTimestamp { + callback: callback_instant, + capture, + }; + let output_timestamp = crate::OutputStreamTimestamp { + callback: callback_instant, + playback, + }; + + // Pull input from Element 1 using AudioUnitRender + // use the pre-allocated input_buffer + // Set up AudioBufferList pointing to our input buffer + let mut input_buffer_list = AudioBufferList { + mNumberBuffers: 1, + mBuffers: [AudioBuffer { + mNumberChannels: input_channels as u32, + mDataByteSize: input_bytes as u32, + mData: input_buffer.as_mut_ptr() as *mut std::ffi::c_void, + }], + }; + + // SAFETY: AudioUnitRender is called with valid parameters: + // - raw_audio_unit is valid for the callback duration + // - input_buffer_list is created just above on the stack with a pointer to + // input_buffer, which is properly aligned (Rust's standard allocators return + // pointers aligned to at least 8 bytes, exceeding f32/i16 requirements) + // - input_buffer has been bounds-checked above to ensure sufficient capacity + // - All other parameters (timestamps, flags, etc.) come from CoreAudio itself + let status = unsafe { + AudioUnitRender( + raw_audio_unit, + io_action_flags.as_ptr(), + in_time_stamp, + 1, // Element 1 = input + in_number_frames, + NonNull::new_unchecked(&mut input_buffer_list), + ) + }; + + if status != 0 { + // Report error but continue with silence for graceful degradation + // The application should decide what to do. + invoke_error_callback( + &error_callback_for_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!( + "AudioUnitRender failed for input: OSStatus {}", + status + ), + }, + }, + ); + input_buffer[..input_bytes].fill(0); + } + + // SAFETY: Creating Data from input_buffer is safe because: + // - input_buffer is a valid Vec owned by this closure + // - input_samples (num_frames * input_channels) was bounds-checked above to ensure + // input_samples * sample_bytes <= input_buffer.len() + // - AudioUnitRender just filled the buffer with valid audio data (or we filled + // it with silence on error) + // - The Data lifetime is scoped to this callback and doesn't outlive input_buffer + // - The pointer is suitably aligned: We successfully passed this buffer to + // AudioUnitRender (line 1314), which requires properly aligned buffers and would + // have failed if alignment were incorrect. Additionally, in practice, Rust's + // standard allocators (System, jemalloc, etc.) return pointers aligned to at + // least 8 bytes, which exceeds the requirements for f32 (4 bytes) and i16 (2 bytes). + let input_data = unsafe { + Data::from_parts( + input_buffer.as_mut_ptr() as *mut (), + input_samples, + sample_format, + ) + }; + + let callback_info = DuplexCallbackInfo::new(input_timestamp, output_timestamp); + data_callback(&input_data, &mut output_data, &callback_info); + + // Return 0 (noErr) to indicate successful render + 0 + }, + ); + + // Box the wrapper and get raw pointer for CoreAudio + let wrapper = Box::new(DuplexProcWrapper { + callback: duplex_proc, + }); + let wrapper_ptr = Box::into_raw(wrapper); + + // Set up the render callback + let render_callback = AURenderCallbackStruct { + inputProc: Some(duplex_input_proc), + inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, + }; + + audio_unit.set_property( + kAudioUnitProperty_SetRenderCallback, + Scope::Global, + Element::Output, + Some(&render_callback), + )?; + + // Create the stream inner, storing the callback pointer for cleanup + let inner = StreamInner { + playing: true, + audio_unit: ManuallyDrop::new(audio_unit), + device_id: self.audio_device_id, + _loopback_device: None, + duplex_callback_ptr: Some(DuplexCallbackPtr(wrapper_ptr)), + }; + + // Always propagate disconnect errors for duplex streams. A duplex stream + // is broken when either direction changes device. + let error_callback_clone = error_callback.clone(); + let error_callback_for_stream: super::ErrorCallback = Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }); + + // Create the duplex stream + let stream = Stream::new(inner, error_callback_for_stream, true)?; + + // Start the audio unit + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })? + .audio_unit + .start()?; + + Ok(stream) + } +} diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index 0a313814a..47c14e355 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -7,17 +7,19 @@ use crate::traits::{HostTrait, StreamTrait}; use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError}; use coreaudio::audio_unit::AudioUnit; use objc2_core_audio::AudioDeviceID; +use std::mem::ManuallyDrop; use std::sync::{mpsc, Arc, Mutex, Weak}; pub use self::enumerate::{default_input_device, default_output_device, Devices}; use objc2_core_audio::{ - kAudioDevicePropertyDeviceIsAlive, kAudioObjectPropertyElementMain, - kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, + kAudioDevicePropertyBufferFrameSize, kAudioDevicePropertyDeviceIsAlive, + kAudioObjectPropertyElementMain, kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, }; use property_listener::AudioObjectPropertyListener; mod device; +mod duplex; pub mod enumerate; mod loopback; mod property_listener; @@ -101,34 +103,65 @@ impl DisconnectManager { device_id: AudioDeviceID, stream_weak: Weak>, error_callback: Arc>, + listen_buffer_size: bool, ) -> Result { let (shutdown_tx, shutdown_rx) = mpsc::channel(); let (disconnect_tx, disconnect_rx) = mpsc::channel(); let (ready_tx, ready_rx) = mpsc::channel(); - // Spawn dedicated thread to own the AudioObjectPropertyListener + // Buffer size listener uses a separate channel so the handler + // can fire StreamInvalidated instead of DeviceNotAvailable. + let (buffer_size_tx, buffer_size_rx) = mpsc::channel(); + let disconnect_tx_clone = disconnect_tx.clone(); + let buffer_size_tx_clone = buffer_size_tx.clone(); std::thread::spawn(move || { - let property_address = AudioObjectPropertyAddress { + let disconnect_address = AudioObjectPropertyAddress { mSelector: kAudioDevicePropertyDeviceIsAlive, mScope: kAudioObjectPropertyScopeGlobal, mElement: kAudioObjectPropertyElementMain, }; - - // Create the listener on this dedicated thread let disconnect_fn = move || { let _ = disconnect_tx_clone.send(()); }; - match AudioObjectPropertyListener::new(device_id, property_address, disconnect_fn) { - Ok(_listener) => { - let _ = ready_tx.send(Ok(())); - // Drop the listener on this thread after receiving a shutdown signal - let _ = shutdown_rx.recv(); - } + let _disconnect_listener = match AudioObjectPropertyListener::new( + device_id, + disconnect_address, + disconnect_fn, + ) { + Ok(listener) => listener, Err(e) => { let _ = ready_tx.send(Err(e)); + return; } - } + }; + + let _buffer_size_listener = if listen_buffer_size { + let buffer_size_address = AudioObjectPropertyAddress { + mSelector: kAudioDevicePropertyBufferFrameSize, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain, + }; + let buffer_size_fn = move || { + let _ = buffer_size_tx_clone.send(()); + }; + match AudioObjectPropertyListener::new( + device_id, + buffer_size_address, + buffer_size_fn, + ) { + Ok(listener) => Some(listener), + Err(e) => { + let _ = ready_tx.send(Err(e)); + return; + } + } + } else { + None + }; + + let _ = ready_tx.send(Ok(())); + let _ = shutdown_rx.recv(); }); // Wait for listener creation to complete or fail @@ -164,6 +197,28 @@ impl DisconnectManager { } }); + // Handle buffer size change events + if listen_buffer_size { + let stream_weak_clone = stream_weak.clone(); + let error_callback_clone = error_callback.clone(); + std::thread::spawn(move || { + while buffer_size_rx.recv().is_ok() { + if let Some(stream_arc) = stream_weak_clone.upgrade() { + if let Ok(mut stream_inner) = stream_arc.try_lock() { + let _ = stream_inner.pause(); + } + + invoke_error_callback( + &error_callback_clone, + crate::StreamError::StreamInvalidated, + ); + } else { + break; + } + } + }); + } + Ok(DisconnectManager { _shutdown_tx: shutdown_tx, }) @@ -178,7 +233,7 @@ impl DisconnectManager { /// is stopped before reclaiming the `Box`, preventing use-after-free. `Send` is sound because /// there is no concurrent mutable access—the build/drop thread never accesses the pointer /// while the audio unit is running, and only reclaims it after stopping the audio unit. -struct DuplexCallbackPtr(*mut device::DuplexProcWrapper); +struct DuplexCallbackPtr(*mut duplex::DuplexProcWrapper); // SAFETY: See above — the pointer is shared with CoreAudio's audio thread but never // accessed concurrently. The audio unit is stopped before reclaiming in drop. @@ -186,7 +241,7 @@ unsafe impl Send for DuplexCallbackPtr {} struct StreamInner { playing: bool, - audio_unit: AudioUnit, + audio_unit: ManuallyDrop, // Track the device with which the audio unit was spawned. // // We must do this so that we can avoid changing the device sample rate if there is already @@ -235,33 +290,28 @@ impl StreamInner { impl Drop for StreamInner { fn drop(&mut self) { - // Clean up duplex callback if present. + // SAFETY: audio_unit is not accessed after this point, and no references + // to self outlive this function. Dropping first ensures callbacks have + // stopped before reclaiming the duplex callback pointer below. + unsafe { + ManuallyDrop::drop(&mut self.audio_unit); + } + if let Some(DuplexCallbackPtr(ptr)) = self.duplex_callback_ptr { if !ptr.is_null() { - // Stop the audio unit to ensure the callback is no longer being called - // before reclaiming duplex_callback_ptr below. We must stop here regardless - // of AudioUnit::drop's behavior. - // Note: AudioUnit::drop will also call stop() — likely safe, but we stop here anyway. - let _ = self.audio_unit.stop(); // SAFETY: `ptr` was created via `Box::into_raw` in // `build_duplex_stream` and has not been reclaimed elsewhere. - // The audio unit was stopped above, so the callback no longer - // holds a reference to this pointer. + // The audio unit was dropped above, so no callbacks are active. unsafe { let _ = Box::from_raw(ptr); } } } - - // AudioUnit's own Drop will handle uninitialize and dispose - // _loopback_device's Drop will handle aggregate device cleanup } } pub struct Stream { inner: Arc>, - // Manages the device disconnection listener separately to allow Stream to be Send. - // The DisconnectManager contains the non-Send AudioObjectPropertyListener. _disconnect_manager: DisconnectManager, } @@ -269,13 +319,15 @@ impl Stream { fn new( inner: StreamInner, error_callback: ErrorCallback, + listen_buffer_size: bool, ) -> Result { let device_id = inner.device_id; let inner_arc = Arc::new(Mutex::new(inner)); let weak_inner = Arc::downgrade(&inner_arc); let error_callback = Arc::new(Mutex::new(error_callback)); - let disconnect_manager = DisconnectManager::new(device_id, weak_inner, error_callback)?; + let disconnect_manager = + DisconnectManager::new(device_id, weak_inner, error_callback, listen_buffer_size)?; Ok(Self { inner: inner_arc, From 2b4e804ba09b92660a7ae213b0caf3ffad82bb97 Mon Sep 17 00:00:00 2001 From: Mark Gulbrandsen Date: Mon, 30 Mar 2026 12:36:47 -0700 Subject: [PATCH 4/4] clean-up comments --- src/duplex.rs | 2 +- src/host/coreaudio/macos/duplex.rs | 111 +++++------------------------ src/host/coreaudio/macos/mod.rs | 15 ++-- src/traits.rs | 2 - 4 files changed, 24 insertions(+), 106 deletions(-) diff --git a/src/duplex.rs b/src/duplex.rs index 25f39ab36..7a201887e 100644 --- a/src/duplex.rs +++ b/src/duplex.rs @@ -27,7 +27,7 @@ impl DuplexCallbackInfo { } } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct DuplexStreamConfig { pub input_channels: ChannelCount, pub output_channels: ChannelCount, diff --git a/src/host/coreaudio/macos/duplex.rs b/src/host/coreaudio/macos/duplex.rs index 864ce75b3..647ce4970 100644 --- a/src/host/coreaudio/macos/duplex.rs +++ b/src/host/coreaudio/macos/duplex.rs @@ -34,11 +34,6 @@ type DuplexProcFn = dyn FnMut( *mut AudioBufferList, ) -> i32; -/// Wrapper for the boxed duplex callback closure. -/// -/// This struct is allocated on the heap and its pointer is passed to CoreAudio -/// as the refcon. The extern "C" callback function casts the refcon back to -/// this type and calls the closure. pub(crate) struct DuplexProcWrapper { callback: Box, } @@ -52,16 +47,8 @@ pub(crate) struct DuplexProcWrapper { // 3. CoreAudio guarantees single-threaded callback invocation unsafe impl Send for DuplexProcWrapper {} -/// CoreAudio render callback for duplex audio. -/// -/// This is a thin wrapper that casts the refcon back to our DuplexProcWrapper -/// and calls the inner closure. The closure owns all the callback state via -/// move semantics, so no Mutex is needed. -/// -/// Note: `extern "C-unwind"` is required here because `AURenderCallbackStruct` -/// from coreaudio-sys types the `inputProc` field as `extern "C-unwind"`. -/// We use `catch_unwind` to prevent panics from unwinding through CoreAudio's -/// C frames, which would be undefined behavior. +// `extern "C-unwind"` matches `AURenderCallbackStruct::inputProc`. +// `catch_unwind` prevents panics from unwinding through CoreAudio's C frames. extern "C-unwind" fn duplex_input_proc( in_ref_con: NonNull, io_action_flags: NonNull, @@ -70,12 +57,10 @@ extern "C-unwind" fn duplex_input_proc( in_number_frames: u32, io_data: *mut AudioBufferList, ) -> i32 { - // SAFETY: `in_ref_con` points to a heap-allocated `DuplexProcWrapper` created - // via `Box::into_raw` in `build_duplex_stream`, and remains valid for the - // lifetime of the audio unit (reclaimed in `StreamInner::drop`). The `as_mut()` call - // produces an exclusive `&mut` reference, which is sound because CoreAudio - // guarantees single-threaded callback invocation — this function is never - // called concurrently, so only one `&mut` to the wrapper exists at a time. + // SAFETY: `in_ref_con` originates from `Box::into_raw` in `build_duplex_stream_raw`. + // `StreamInner::drop` stops the audio unit before reclaiming the pointer, + // so it remains valid for the lifetime of the callback. + // Called from a single render thread per audio unit, so `as_mut()` has exclusive access. let wrapper = unsafe { in_ref_con.cast::().as_mut() }; match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { (wrapper.callback)( @@ -92,11 +77,7 @@ extern "C-unwind" fn duplex_input_proc( } impl Device { - /// Build a duplex stream with synchronized input and output. - /// - /// This creates a single HAL AudioUnit with both input and output enabled, - /// ensuring they share the same hardware clock. - /// For details, see: https://developer.apple.com/library/archive/technotes/tn2091/_index.html + // See: https://developer.apple.com/library/archive/technotes/tn2091/_index.html pub(crate) fn build_duplex_stream_raw( &self, config: &crate::duplex::DuplexStreamConfig, @@ -109,19 +90,14 @@ impl Device { D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - // Validate that device supports duplex if !self.supports_duplex() { return Err(BuildStreamError::StreamConfigNotSupported); } - // Potentially change the device sample rate to match the config. set_sample_rate(self.audio_device_id, config.sample_rate)?; - // Create HAL AudioUnit - always use HalOutput for duplex let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; - // Enable BOTH input and output on the AudioUnit - // Enable input on Element 1 audio_unit.set_property( kAudioOutputUnitProperty_EnableIO, Scope::Input, @@ -129,7 +105,6 @@ impl Device { Some(&AUDIO_UNIT_IO_ENABLED), )?; - // Enable output on Element 0 (usually enabled by default, but be explicit) audio_unit.set_property( kAudioOutputUnitProperty_EnableIO, Scope::Output, @@ -137,7 +112,6 @@ impl Device { Some(&AUDIO_UNIT_IO_ENABLED), )?; - // Set device for the unit (applies to both input and output) audio_unit.set_property( kAudioOutputUnitProperty_CurrentDevice, Scope::Global, @@ -145,34 +119,20 @@ impl Device { Some(&self.audio_device_id), )?; - // Create StreamConfig for input side let input_stream_config = StreamConfig { channels: config.input_channels, sample_rate: config.sample_rate, buffer_size: config.buffer_size, }; - // Create StreamConfig for output side let output_stream_config = StreamConfig { channels: config.output_channels, sample_rate: config.sample_rate, buffer_size: config.buffer_size, }; - // Core Audio's HAL AU has two buses, each with a hardware side and a - // client (app) side. We set the stream format on the client-facing side; - // the AU's built-in converter handles translation to/from the hardware format. - // - // Mic ─[Scope::Input]──▶ Input Bus ──[Scope::Output]─▶ App - // (hardware side) (client side) - // - // App ─[Scope::Input]──▶ Output Bus ─[Scope::Output]─▶ Speaker - // (client side) (hardware side) - // - // So the client side is Scope::Output for the input bus (where we read - // captured samples) and Scope::Input for the output bus (where we write - // playback samples). See Apple TN2091 for details. - let input_asbd = asbd_from_config(&input_stream_config, sample_format); + // Client-side format: Scope::Output for input bus, Scope::Input for output bus. + let input_asbd = asbd_from_config(input_stream_config, sample_format); audio_unit.set_property( kAudioUnitProperty_StreamFormat, Scope::Output, @@ -180,7 +140,7 @@ impl Device { Some(&input_asbd), )?; - let output_asbd = asbd_from_config(&output_stream_config, sample_format); + let output_asbd = asbd_from_config(output_stream_config, sample_format); audio_unit.set_property( kAudioUnitProperty_StreamFormat, Scope::Input, @@ -188,8 +148,6 @@ impl Device { Some(&output_asbd), )?; - // Buffer frame size is a device-level property. Element::Output (0) is - // the standard convention for Scope::Global properties. if let BufferSize::Fixed(buffer_size) = &config.buffer_size { audio_unit.set_property( kAudioDevicePropertyBufferFrameSize, @@ -199,7 +157,6 @@ impl Device { )?; } - // Allocate input buffer for the current device buffer size. let current_buffer_size = get_device_buffer_frame_size(&audio_unit).map_err(|e| { BuildStreamError::BackendSpecific { err: BackendSpecificError { @@ -217,7 +174,6 @@ impl Device { let input_buffer_bytes = current_buffer_size * input_channels * sample_bytes; let mut input_buffer: Box<[u8]> = vec![0u8; input_buffer_bytes].into_boxed_slice(); - // Wrap error callback in Arc for sharing between callback and disconnect handler let error_callback = Arc::new(Mutex::new(error_callback)); let error_callback_for_callback = error_callback.clone(); @@ -238,7 +194,7 @@ impl Device { if io_data.is_null() { return kAudio_ParamError; } - // SAFETY: io_data validated as non-null above + // SAFETY: io_data validated as non-null above. let buffer_list = unsafe { &mut *io_data }; if buffer_list.mNumberBuffers == 0 { return kAudio_ParamError; @@ -253,16 +209,12 @@ impl Device { return kAudio_ParamError; } - // SAFETY: in_time_stamp is valid per CoreAudio contract + // SAFETY: in_time_stamp is valid per CoreAudio callback contract. let timestamp: &AudioTimeStamp = unsafe { in_time_stamp.as_ref() }; - // Create StreamInstant for callback_instant let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { Err(err) => { invoke_error_callback(&error_callback_for_callback, err.into()); - // Return 0 (noErr) to keep the stream alive while notifying the error - // callback. This matches input/output stream behavior and allows graceful - // degradation rather than stopping the stream on transient errors. return 0; } Ok(cb) => cb, @@ -274,7 +226,7 @@ impl Device { } let output_samples = buffer.mDataByteSize as usize / sample_bytes; - // SAFETY: buffer.mData validated as non-null above + // SAFETY: buffer.mData validated as non-null above. let mut output_data = unsafe { Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) }; @@ -289,7 +241,6 @@ impl Device { &error_callback_for_callback, ); - // Create callback info with latency-adjusted times let input_timestamp = crate::InputStreamTimestamp { callback: callback_instant, capture, @@ -299,9 +250,6 @@ impl Device { playback, }; - // Pull input from Element 1 using AudioUnitRender - // use the pre-allocated input_buffer - // Set up AudioBufferList pointing to our input buffer let mut input_buffer_list = AudioBufferList { mNumberBuffers: 1, mBuffers: [AudioBuffer { @@ -311,13 +259,8 @@ impl Device { }], }; - // SAFETY: AudioUnitRender is called with valid parameters: - // - raw_audio_unit is valid for the callback duration - // - input_buffer_list is created just above on the stack with a pointer to - // input_buffer, which is properly aligned (Rust's standard allocators return - // pointers aligned to at least 8 bytes, exceeding f32/i16 requirements) - // - input_buffer has been bounds-checked above to ensure sufficient capacity - // - All other parameters (timestamps, flags, etc.) come from CoreAudio itself + // SAFETY: raw_audio_unit is valid for the callback duration, + // input_buffer_list points to bounds-checked input_buffer. let status = unsafe { AudioUnitRender( raw_audio_unit, @@ -330,8 +273,6 @@ impl Device { }; if status != 0 { - // Report error but continue with silence for graceful degradation - // The application should decide what to do. invoke_error_callback( &error_callback_for_callback, StreamError::BackendSpecific { @@ -346,18 +287,8 @@ impl Device { input_buffer[..input_bytes].fill(0); } - // SAFETY: Creating Data from input_buffer is safe because: - // - input_buffer is a valid Vec owned by this closure - // - input_samples (num_frames * input_channels) was bounds-checked above to ensure - // input_samples * sample_bytes <= input_buffer.len() - // - AudioUnitRender just filled the buffer with valid audio data (or we filled - // it with silence on error) - // - The Data lifetime is scoped to this callback and doesn't outlive input_buffer - // - The pointer is suitably aligned: We successfully passed this buffer to - // AudioUnitRender (line 1314), which requires properly aligned buffers and would - // have failed if alignment were incorrect. Additionally, in practice, Rust's - // standard allocators (System, jemalloc, etc.) return pointers aligned to at - // least 8 bytes, which exceeds the requirements for f32 (4 bytes) and i16 (2 bytes). + // SAFETY: input_buffer is bounds-checked, filled by AudioUnitRender + // (or zeroed on error), and outlives this Data reference. let input_data = unsafe { Data::from_parts( input_buffer.as_mut_ptr() as *mut (), @@ -369,18 +300,15 @@ impl Device { let callback_info = DuplexCallbackInfo::new(input_timestamp, output_timestamp); data_callback(&input_data, &mut output_data, &callback_info); - // Return 0 (noErr) to indicate successful render 0 }, ); - // Box the wrapper and get raw pointer for CoreAudio let wrapper = Box::new(DuplexProcWrapper { callback: duplex_proc, }); let wrapper_ptr = Box::into_raw(wrapper); - // Set up the render callback let render_callback = AURenderCallbackStruct { inputProc: Some(duplex_input_proc), inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, @@ -393,7 +321,6 @@ impl Device { Some(&render_callback), )?; - // Create the stream inner, storing the callback pointer for cleanup let inner = StreamInner { playing: true, audio_unit: ManuallyDrop::new(audio_unit), @@ -402,17 +329,13 @@ impl Device { duplex_callback_ptr: Some(DuplexCallbackPtr(wrapper_ptr)), }; - // Always propagate disconnect errors for duplex streams. A duplex stream - // is broken when either direction changes device. let error_callback_clone = error_callback.clone(); let error_callback_for_stream: super::ErrorCallback = Box::new(move |err: StreamError| { invoke_error_callback(&error_callback_clone, err); }); - // Create the duplex stream let stream = Stream::new(inner, error_callback_for_stream, true)?; - // Start the audio unit stream .inner .lock() diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index 47c14e355..dd31af66e 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -109,8 +109,6 @@ impl DisconnectManager { let (disconnect_tx, disconnect_rx) = mpsc::channel(); let (ready_tx, ready_rx) = mpsc::channel(); - // Buffer size listener uses a separate channel so the handler - // can fire StreamInvalidated instead of DeviceNotAvailable. let (buffer_size_tx, buffer_size_rx) = mpsc::channel(); let disconnect_tx_clone = disconnect_tx.clone(); @@ -197,7 +195,6 @@ impl DisconnectManager { } }); - // Handle buffer size change events if listen_buffer_size { let stream_weak_clone = stream_weak.clone(); let error_callback_clone = error_callback.clone(); @@ -290,18 +287,18 @@ impl StreamInner { impl Drop for StreamInner { fn drop(&mut self) { - // SAFETY: audio_unit is not accessed after this point, and no references - // to self outlive this function. Dropping first ensures callbacks have - // stopped before reclaiming the duplex callback pointer below. + // SAFETY: This is the sole owning instance of audio_unit (wrapped in + // ManuallyDrop so we control drop order). Dropping it stops the audio + // unit, which guarantees CoreAudio will not invoke the render callback + // after this point. That makes it safe to reclaim the duplex callback + // pointer below. audio_unit is not accessed after this point. unsafe { ManuallyDrop::drop(&mut self.audio_unit); } if let Some(DuplexCallbackPtr(ptr)) = self.duplex_callback_ptr { if !ptr.is_null() { - // SAFETY: `ptr` was created via `Box::into_raw` in - // `build_duplex_stream` and has not been reclaimed elsewhere. - // The audio unit was dropped above, so no callbacks are active. + // SAFETY: ptr created via Box::into_raw, not reclaimed elsewhere. unsafe { let _ = Box::from_raw(ptr); } diff --git a/src/traits.rs b/src/traits.rs index fc2c9650c..6c29a499e 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -300,8 +300,6 @@ pub trait DeviceTrait { E: FnMut(StreamError) + Send + 'static; /// Create a duplex stream with synchronized input and output sharing the same hardware clock. - /// - /// Check [`supports_duplex`](Self::supports_duplex) before calling. See the example below for usage. fn build_duplex_stream( &self, config: &DuplexStreamConfig,