diff --git a/CHANGELOG.md b/CHANGELOG.md index c78c8a3a1..277139ba7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`. - **PipeWire**: New host for Linux and some BSDs using the PipeWire API. - **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API. +- `DeviceTrait::build_duplex_stream` and `build_duplex_stream_raw` for synchronized input/output. +- `duplex` module with `DuplexStreamConfig` and `DuplexCallbackInfo` types. +- **CoreAudio**: Duplex stream support with hardware-synchronized input/output. +- Example `duplex_feedback` demonstrating duplex stream usage. ### Changed @@ -91,6 +95,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **POTENTIALLY BREAKING**: `DeviceTrait` now includes `build_duplex_stream()` and `build_duplex_stream_raw()` methods. The default implementation returns `StreamConfigNotSupported`, so external implementations are compatible without changes. - Bump overall MSRV to 1.78. - **ALSA**: Update `alsa` dependency to 0.11. - **ALSA**: Bump MSRV to 1.82. diff --git a/Cargo.toml b/Cargo.toml index 8e28c9ca5..41ac7d253 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,6 +201,9 @@ name = "record_wav" [[example]] name = "synth_tones" +[[example]] +name = "duplex_feedback" + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 2badf60d3..5a3903254 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ This library currently supports the following: - Enumerate known supported input and output stream formats for a device. - Get the current default input and output stream formats for a device. - Build and run input and output PCM streams on a chosen device with a given stream format. +- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization. Currently, supported platforms include: @@ -174,7 +175,7 @@ If you are unable to build the library: ## Examples -CPAL comes with several examples in `examples/`. +CPAL comes with several examples in `examples/`, including `duplex_feedback` for hardware-synchronized duplex streams. Run an example with: ```bash diff --git a/examples/duplex_feedback.rs b/examples/duplex_feedback.rs new file mode 100644 index 000000000..1633caa95 --- /dev/null +++ b/examples/duplex_feedback.rs @@ -0,0 +1,99 @@ +// Duplex feedback example: feeds the input stream directly into the output. + +#[cfg(target_os = "macos")] +mod imp { + use clap::Parser; + use cpal::duplex::DuplexStreamConfig; + use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + use cpal::{BufferSize, ChannelCount, FrameCount, Sample, SampleRate}; + + #[derive(Parser, Debug)] + #[command(version, about = "CPAL duplex feedback example", long_about = None)] + struct Opt { + /// The audio device to use (must support duplex operation) + #[arg(short, long, value_name = "DEVICE")] + device: Option, + + /// Number of input channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + input_channels: ChannelCount, + + /// Number of output channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + output_channels: ChannelCount, + + /// Sample rate in Hz + #[arg(short, long, value_name = "RATE", default_value_t = 48000)] + sample_rate: SampleRate, + + /// Buffer size in frames (omit for device default) + #[arg(short, long, value_name = "FRAMES")] + buffer_size: Option, + } + + pub fn run() -> anyhow::Result<()> { + let opt = Opt::parse(); + let host = cpal::default_host(); + + let device = match opt.device { + Some(device_id_str) => { + let device_id = device_id_str.parse().expect("failed to parse device id"); + host.device_by_id(&device_id) + .expect(&format!("failed to find device with id: {}", device_id_str)) + } + None => host + .default_output_device() + .expect("no default output device"), + }; + + println!("Using device: \"{}\"", device.description()?.name()); + + let config = DuplexStreamConfig { + input_channels: opt.input_channels, + output_channels: opt.output_channels, + sample_rate: opt.sample_rate, + buffer_size: opt + .buffer_size + .map(|s| BufferSize::Fixed(s)) + .unwrap_or(BufferSize::Default), + }; + + println!("Building duplex stream with config: {config:?}"); + + let stream = device.build_duplex_stream::( + &config, + move |input, output, _info| { + output.fill(Sample::EQUILIBRIUM); + let copy_len = input.len().min(output.len()); + output[..copy_len].copy_from_slice(&input[..copy_len]); + }, + |err| eprintln!("Stream error: {err}"), + None, + )?; + + println!("Successfully built duplex stream."); + println!( + "Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {:?} frames", + opt.input_channels, opt.output_channels, opt.sample_rate, opt.buffer_size + ); + + println!("Starting duplex stream..."); + stream.play()?; + + println!("Playing for 10 seconds... (speak into your microphone)"); + std::thread::sleep(std::time::Duration::from_secs(10)); + + println!("Done!"); + Ok(()) + } +} + +fn main() { + #[cfg(target_os = "macos")] + imp::run().unwrap(); + + #[cfg(not(target_os = "macos"))] + { + eprintln!("Duplex streams are not supported on this platform."); + } +} diff --git a/src/duplex.rs b/src/duplex.rs new file mode 100644 index 000000000..7a201887e --- /dev/null +++ b/src/duplex.rs @@ -0,0 +1,36 @@ +use crate::{ChannelCount, InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; + +// Timing information for a duplex callback, combining input and output timestamps. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct DuplexCallbackInfo { + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, +} + +impl DuplexCallbackInfo { + pub fn new( + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, + ) -> Self { + Self { + input_timestamp, + output_timestamp, + } + } + + pub fn input_timestamp(&self) -> InputStreamTimestamp { + self.input_timestamp + } + + pub fn output_timestamp(&self) -> OutputStreamTimestamp { + self.output_timestamp + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct DuplexStreamConfig { + pub input_channels: ChannelCount, + pub output_channels: ChannelCount, + pub sample_rate: SampleRate, + pub buffer_size: crate::BufferSize, +} diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 7267d9d50..40bb08193 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -1,9 +1,11 @@ use super::OSStatus; use super::Stream; use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant}; +use crate::duplex::DuplexCallbackInfo; use crate::host::coreaudio::macos::loopback::LoopbackDevice; use crate::host::coreaudio::macos::StreamInner; use crate::traits::DeviceTrait; +use crate::StreamInstant; use crate::{ BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceId, DeviceIdError, DeviceNameError, InputCallbackInfo, @@ -34,6 +36,7 @@ use objc2_core_audio_types::{ }; use objc2_core_foundation::CFString; use objc2_core_foundation::Type; +use std::mem::ManuallyDrop; pub use super::enumerate::{ default_input_device, default_output_device, SupportedInputConfigs, SupportedOutputConfigs, @@ -49,9 +52,13 @@ use super::invoke_error_callback; use super::property_listener::AudioObjectPropertyListener; use coreaudio::audio_unit::macos_helpers::get_device_name; +pub(super) const AUDIO_UNIT_IO_ENABLED: u32 = 1; +/// Value for `kAudioOutputUnitProperty_EnableIO` to disable I/O on an AudioUnit element. +const AUDIO_UNIT_IO_DISABLED: u32 = 0; + /// Attempt to set the device sample rate to the provided rate. /// Return an error if the requested sample rate is not supported by the device. -fn set_sample_rate( +pub(super) fn set_sample_rate( audio_device_id: AudioObjectID, target_sample_rate: SampleRate, ) -> Result<(), BuildStreamError> { @@ -214,21 +221,19 @@ fn audio_unit_from_device(device: &Device, input: bool) -> Result( + callback_instant: StreamInstant, + delay: Duration, + error_callback: &Mutex, +) -> StreamInstant +where + E: FnMut(StreamError) + Send, +{ + callback_instant.sub(delay).unwrap_or_else(|| { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp underflow calculating capture time".into(), + }, + }, + ); + callback_instant + }) +} + +pub(super) fn estimate_playback_instant( + callback_instant: StreamInstant, + delay: Duration, + error_callback: &Mutex, +) -> StreamInstant +where + E: FnMut(StreamError) + Send, +{ + callback_instant.add(delay).unwrap_or_else(|| { + invoke_error_callback( + error_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: "Timestamp overflow calculating playback time".into(), + }, + }, + ); + callback_instant + }) +} + impl DeviceTrait for Device { type SupportedInputConfigs = SupportedInputConfigs; type SupportedOutputConfigs = SupportedOutputConfigs; @@ -336,6 +383,28 @@ impl DeviceTrait for Device { timeout, ) } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Device::build_duplex_stream_raw( + self, + config, + sample_format, + data_callback, + error_callback, + _timeout, + ) + } } #[derive(Clone, Eq, Hash, PartialEq)] @@ -796,9 +865,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = estimate_capture_instant(callback, delay, &error_callback); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -819,11 +886,13 @@ impl Device { let stream = Stream::new( StreamInner { playing: true, - audio_unit, + audio_unit: ManuallyDrop::new(audio_unit), device_id: self.audio_device_id, _loopback_device: loopback_aggregate, + duplex_callback_ptr: None, }, error_callback_for_stream, + false, )?; stream @@ -896,9 +965,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = estimate_playback_instant(callback, delay, &error_callback); let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; @@ -919,11 +986,13 @@ impl Device { let stream = Stream::new( StreamInner { playing: true, - audio_unit, + audio_unit: ManuallyDrop::new(audio_unit), device_id: self.audio_device_id, _loopback_device: None, + duplex_callback_ptr: None, }, error_callback_for_stream, + false, )?; stream @@ -1018,7 +1087,7 @@ fn setup_callback_vars( /// /// Buffer frame size is a device-level property that always uses Scope::Global + Element::Output, /// regardless of whether the audio unit is configured for input or output streams. -pub(crate) fn get_device_buffer_frame_size( +pub(super) fn get_device_buffer_frame_size( audio_unit: &AudioUnit, ) -> Result { // Device-level property: always use Scope::Global + Element::Output diff --git a/src/host/coreaudio/macos/duplex.rs b/src/host/coreaudio/macos/duplex.rs new file mode 100644 index 000000000..647ce4970 --- /dev/null +++ b/src/host/coreaudio/macos/duplex.rs @@ -0,0 +1,352 @@ +use super::{ + asbd_from_config, frames_to_duration, host_time_to_stream_instant, invoke_error_callback, + DuplexCallbackPtr, Stream, StreamInner, +}; +use crate::duplex::DuplexCallbackInfo; +use crate::{ + BackendSpecificError, BufferSize, BuildStreamError, Data, SampleFormat, StreamConfig, + StreamError, +}; +use coreaudio::audio_unit::{AudioUnit, Element, Scope}; +use objc2_audio_toolbox::{ + kAudioOutputUnitProperty_CurrentDevice, kAudioOutputUnitProperty_EnableIO, + kAudioUnitProperty_SetRenderCallback, kAudioUnitProperty_StreamFormat, AURenderCallbackStruct, + AudioUnitRender, AudioUnitRenderActionFlags, +}; +use objc2_core_audio::kAudioDevicePropertyBufferFrameSize; +use objc2_core_audio_types::{kAudio_ParamError, AudioBuffer, AudioBufferList, AudioTimeStamp}; +use std::ffi::c_void; +use std::mem::ManuallyDrop; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex}; + +use super::device::{ + estimate_capture_instant, estimate_playback_instant, get_device_buffer_frame_size, + set_sample_rate, Device, AUDIO_UNIT_IO_ENABLED, +}; +use crate::traits::DeviceTrait; + +type DuplexProcFn = dyn FnMut( + NonNull, + NonNull, + u32, // bus_number + u32, // num_frames + *mut AudioBufferList, +) -> i32; + +pub(crate) struct DuplexProcWrapper { + callback: Box, +} + +// SAFETY: DuplexProcWrapper is Send because: +// 1. The boxed closure captures only Send types (the DuplexCallback trait requires Send) +// 2. The raw pointer stored in StreamInner is accessed: +// - By CoreAudio's audio thread via `duplex_input_proc` (as the refcon) +// - During Drop, after stopping the audio unit (callback no longer running) +// These never overlap: Drop stops the audio unit before reclaiming the pointer. +// 3. CoreAudio guarantees single-threaded callback invocation +unsafe impl Send for DuplexProcWrapper {} + +// `extern "C-unwind"` matches `AURenderCallbackStruct::inputProc`. +// `catch_unwind` prevents panics from unwinding through CoreAudio's C frames. +extern "C-unwind" fn duplex_input_proc( + in_ref_con: NonNull, + io_action_flags: NonNull, + in_time_stamp: NonNull, + in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList, +) -> i32 { + // SAFETY: `in_ref_con` originates from `Box::into_raw` in `build_duplex_stream_raw`. + // `StreamInner::drop` stops the audio unit before reclaiming the pointer, + // so it remains valid for the lifetime of the callback. + // Called from a single render thread per audio unit, so `as_mut()` has exclusive access. + let wrapper = unsafe { in_ref_con.cast::().as_mut() }; + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + (wrapper.callback)( + io_action_flags, + in_time_stamp, + in_bus_number, + in_number_frames, + io_data, + ) + })) { + Ok(result) => result, + Err(_) => kAudio_ParamError, + } +} + +impl Device { + // See: https://developer.apple.com/library/archive/technotes/tn2091/_index.html + pub(crate) fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if !self.supports_duplex() { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + set_sample_rate(self.audio_device_id, config.sample_rate)?; + + let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; + + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Input, + Element::Input, + Some(&AUDIO_UNIT_IO_ENABLED), + )?; + + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Output, + Element::Output, + Some(&AUDIO_UNIT_IO_ENABLED), + )?; + + audio_unit.set_property( + kAudioOutputUnitProperty_CurrentDevice, + Scope::Global, + Element::Output, + Some(&self.audio_device_id), + )?; + + let input_stream_config = StreamConfig { + channels: config.input_channels, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + let output_stream_config = StreamConfig { + channels: config.output_channels, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Client-side format: Scope::Output for input bus, Scope::Input for output bus. + let input_asbd = asbd_from_config(input_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Output, + Element::Input, + Some(&input_asbd), + )?; + + let output_asbd = asbd_from_config(output_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Input, + Element::Output, + Some(&output_asbd), + )?; + + if let BufferSize::Fixed(buffer_size) = &config.buffer_size { + audio_unit.set_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + Some(buffer_size), + )?; + } + + let current_buffer_size = get_device_buffer_frame_size(&audio_unit).map_err(|e| { + BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: e.to_string(), + }, + } + })?; + + let sample_rate = config.sample_rate; + let device_buffer_frames = current_buffer_size; + let raw_audio_unit = *audio_unit.as_ref(); + let input_channels = config.input_channels as usize; + let sample_bytes = sample_format.sample_size(); + + let input_buffer_bytes = current_buffer_size * input_channels * sample_bytes; + let mut input_buffer: Box<[u8]> = vec![0u8; input_buffer_bytes].into_boxed_slice(); + + let error_callback = Arc::new(Mutex::new(error_callback)); + let error_callback_for_callback = error_callback.clone(); + + let mut data_callback = data_callback; + let buffer_size_changed = std::sync::atomic::AtomicBool::new(false); + + let duplex_proc: Box = Box::new( + move |io_action_flags: NonNull, + in_time_stamp: NonNull, + _in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList| + -> i32 { + if buffer_size_changed.load(std::sync::atomic::Ordering::Relaxed) { + return kAudio_ParamError; + } + + if io_data.is_null() { + return kAudio_ParamError; + } + // SAFETY: io_data validated as non-null above. + let buffer_list = unsafe { &mut *io_data }; + if buffer_list.mNumberBuffers == 0 { + return kAudio_ParamError; + } + + let num_frames = in_number_frames as usize; + let input_samples = num_frames * input_channels; + let input_bytes = input_samples * sample_bytes; + + if input_bytes != input_buffer.len() { + buffer_size_changed.store(true, std::sync::atomic::Ordering::Relaxed); + return kAudio_ParamError; + } + + // SAFETY: in_time_stamp is valid per CoreAudio callback contract. + let timestamp: &AudioTimeStamp = unsafe { in_time_stamp.as_ref() }; + + let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { + Err(err) => { + invoke_error_callback(&error_callback_for_callback, err.into()); + return 0; + } + Ok(cb) => cb, + }; + + let buffer = &mut buffer_list.mBuffers[0]; + if buffer.mData.is_null() { + return kAudio_ParamError; + } + let output_samples = buffer.mDataByteSize as usize / sample_bytes; + + // SAFETY: buffer.mData validated as non-null above. + let mut output_data = unsafe { + Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) + }; + + let delay = frames_to_duration(device_buffer_frames, sample_rate); + + let capture = + estimate_capture_instant(callback_instant, delay, &error_callback_for_callback); + let playback = estimate_playback_instant( + callback_instant, + delay, + &error_callback_for_callback, + ); + + let input_timestamp = crate::InputStreamTimestamp { + callback: callback_instant, + capture, + }; + let output_timestamp = crate::OutputStreamTimestamp { + callback: callback_instant, + playback, + }; + + let mut input_buffer_list = AudioBufferList { + mNumberBuffers: 1, + mBuffers: [AudioBuffer { + mNumberChannels: input_channels as u32, + mDataByteSize: input_bytes as u32, + mData: input_buffer.as_mut_ptr() as *mut std::ffi::c_void, + }], + }; + + // SAFETY: raw_audio_unit is valid for the callback duration, + // input_buffer_list points to bounds-checked input_buffer. + let status = unsafe { + AudioUnitRender( + raw_audio_unit, + io_action_flags.as_ptr(), + in_time_stamp, + 1, // Element 1 = input + in_number_frames, + NonNull::new_unchecked(&mut input_buffer_list), + ) + }; + + if status != 0 { + invoke_error_callback( + &error_callback_for_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!( + "AudioUnitRender failed for input: OSStatus {}", + status + ), + }, + }, + ); + input_buffer[..input_bytes].fill(0); + } + + // SAFETY: input_buffer is bounds-checked, filled by AudioUnitRender + // (or zeroed on error), and outlives this Data reference. + let input_data = unsafe { + Data::from_parts( + input_buffer.as_mut_ptr() as *mut (), + input_samples, + sample_format, + ) + }; + + let callback_info = DuplexCallbackInfo::new(input_timestamp, output_timestamp); + data_callback(&input_data, &mut output_data, &callback_info); + + 0 + }, + ); + + let wrapper = Box::new(DuplexProcWrapper { + callback: duplex_proc, + }); + let wrapper_ptr = Box::into_raw(wrapper); + + let render_callback = AURenderCallbackStruct { + inputProc: Some(duplex_input_proc), + inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, + }; + + audio_unit.set_property( + kAudioUnitProperty_SetRenderCallback, + Scope::Global, + Element::Output, + Some(&render_callback), + )?; + + let inner = StreamInner { + playing: true, + audio_unit: ManuallyDrop::new(audio_unit), + device_id: self.audio_device_id, + _loopback_device: None, + duplex_callback_ptr: Some(DuplexCallbackPtr(wrapper_ptr)), + }; + + let error_callback_clone = error_callback.clone(); + let error_callback_for_stream: super::ErrorCallback = Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }); + + let stream = Stream::new(inner, error_callback_for_stream, true)?; + + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })? + .audio_unit + .start()?; + + Ok(stream) + } +} diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index d94525037..dd31af66e 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -7,17 +7,19 @@ use crate::traits::{HostTrait, StreamTrait}; use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError}; use coreaudio::audio_unit::AudioUnit; use objc2_core_audio::AudioDeviceID; +use std::mem::ManuallyDrop; use std::sync::{mpsc, Arc, Mutex, Weak}; pub use self::enumerate::{default_input_device, default_output_device, Devices}; use objc2_core_audio::{ - kAudioDevicePropertyDeviceIsAlive, kAudioObjectPropertyElementMain, - kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, + kAudioDevicePropertyBufferFrameSize, kAudioDevicePropertyDeviceIsAlive, + kAudioObjectPropertyElementMain, kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, }; use property_listener::AudioObjectPropertyListener; mod device; +mod duplex; pub mod enumerate; mod loopback; mod property_listener; @@ -61,7 +63,7 @@ type ErrorCallback = Box; /// Invoke error callback, recovering from poisoned mutex if needed. /// Returns true if callback was invoked, false if skipped due to WouldBlock. #[inline] -fn invoke_error_callback(error_callback: &Arc>, err: crate::StreamError) -> bool +fn invoke_error_callback(error_callback: &Mutex, err: crate::StreamError) -> bool where E: FnMut(crate::StreamError) + Send, { @@ -101,34 +103,63 @@ impl DisconnectManager { device_id: AudioDeviceID, stream_weak: Weak>, error_callback: Arc>, + listen_buffer_size: bool, ) -> Result { let (shutdown_tx, shutdown_rx) = mpsc::channel(); let (disconnect_tx, disconnect_rx) = mpsc::channel(); let (ready_tx, ready_rx) = mpsc::channel(); - // Spawn dedicated thread to own the AudioObjectPropertyListener + let (buffer_size_tx, buffer_size_rx) = mpsc::channel(); + let disconnect_tx_clone = disconnect_tx.clone(); + let buffer_size_tx_clone = buffer_size_tx.clone(); std::thread::spawn(move || { - let property_address = AudioObjectPropertyAddress { + let disconnect_address = AudioObjectPropertyAddress { mSelector: kAudioDevicePropertyDeviceIsAlive, mScope: kAudioObjectPropertyScopeGlobal, mElement: kAudioObjectPropertyElementMain, }; - - // Create the listener on this dedicated thread let disconnect_fn = move || { let _ = disconnect_tx_clone.send(()); }; - match AudioObjectPropertyListener::new(device_id, property_address, disconnect_fn) { - Ok(_listener) => { - let _ = ready_tx.send(Ok(())); - // Drop the listener on this thread after receiving a shutdown signal - let _ = shutdown_rx.recv(); - } + let _disconnect_listener = match AudioObjectPropertyListener::new( + device_id, + disconnect_address, + disconnect_fn, + ) { + Ok(listener) => listener, Err(e) => { let _ = ready_tx.send(Err(e)); + return; } - } + }; + + let _buffer_size_listener = if listen_buffer_size { + let buffer_size_address = AudioObjectPropertyAddress { + mSelector: kAudioDevicePropertyBufferFrameSize, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain, + }; + let buffer_size_fn = move || { + let _ = buffer_size_tx_clone.send(()); + }; + match AudioObjectPropertyListener::new( + device_id, + buffer_size_address, + buffer_size_fn, + ) { + Ok(listener) => Some(listener), + Err(e) => { + let _ = ready_tx.send(Err(e)); + return; + } + } + } else { + None + }; + + let _ = ready_tx.send(Ok(())); + let _ = shutdown_rx.recv(); }); // Wait for listener creation to complete or fail @@ -164,15 +195,50 @@ impl DisconnectManager { } }); + if listen_buffer_size { + let stream_weak_clone = stream_weak.clone(); + let error_callback_clone = error_callback.clone(); + std::thread::spawn(move || { + while buffer_size_rx.recv().is_ok() { + if let Some(stream_arc) = stream_weak_clone.upgrade() { + if let Ok(mut stream_inner) = stream_arc.try_lock() { + let _ = stream_inner.pause(); + } + + invoke_error_callback( + &error_callback_clone, + crate::StreamError::StreamInvalidated, + ); + } else { + break; + } + } + }); + } + Ok(DisconnectManager { _shutdown_tx: shutdown_tx, }) } } +/// Owned pointer to the duplex callback wrapper that is safe to send across threads. +/// +/// SAFETY: The pointer is created via `Box::into_raw` on the build thread and shared with +/// CoreAudio via `inputProcRefCon`. CoreAudio dereferences it on every render callback on +/// its single-threaded audio thread for the lifetime of the stream. On drop, the audio unit +/// is stopped before reclaiming the `Box`, preventing use-after-free. `Send` is sound because +/// there is no concurrent mutable access—the build/drop thread never accesses the pointer +/// while the audio unit is running, and only reclaims it after stopping the audio unit. +struct DuplexCallbackPtr(*mut duplex::DuplexProcWrapper); + +// SAFETY: See above — the pointer is shared with CoreAudio's audio thread but never +// accessed concurrently. The audio unit is stopped before reclaiming in drop. +unsafe impl Send for DuplexCallbackPtr {} + struct StreamInner { playing: bool, - audio_unit: AudioUnit, + audio_unit: ManuallyDrop, // Track the device with which the audio unit was spawned. // // We must do this so that we can avoid changing the device sample rate if there is already @@ -182,13 +248,22 @@ struct StreamInner { /// Manage the lifetime of the aggregate device used /// for loopback recording _loopback_device: Option, + /// Pointer to the duplex callback wrapper, manually managed for duplex streams. + /// + /// coreaudio-rs doesn't support duplex streams (enabling both input and output + /// simultaneously), so we cannot use its `set_render_callback` API which would + /// manage the callback lifetime automatically. Instead, we manually manage this + /// callback pointer (created via `Box::into_raw`) and clean it up in Drop. + /// + /// This is None for regular input/output streams. + duplex_callback_ptr: Option, } impl StreamInner { fn play(&mut self) -> Result<(), PlayStreamError> { if !self.playing { if let Err(e) = self.audio_unit.start() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -200,7 +275,7 @@ impl StreamInner { fn pause(&mut self) -> Result<(), PauseStreamError> { if self.playing { if let Err(e) = self.audio_unit.stop() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -210,10 +285,30 @@ impl StreamInner { } } +impl Drop for StreamInner { + fn drop(&mut self) { + // SAFETY: This is the sole owning instance of audio_unit (wrapped in + // ManuallyDrop so we control drop order). Dropping it stops the audio + // unit, which guarantees CoreAudio will not invoke the render callback + // after this point. That makes it safe to reclaim the duplex callback + // pointer below. audio_unit is not accessed after this point. + unsafe { + ManuallyDrop::drop(&mut self.audio_unit); + } + + if let Some(DuplexCallbackPtr(ptr)) = self.duplex_callback_ptr { + if !ptr.is_null() { + // SAFETY: ptr created via Box::into_raw, not reclaimed elsewhere. + unsafe { + let _ = Box::from_raw(ptr); + } + } + } + } +} + pub struct Stream { inner: Arc>, - // Manages the device disconnection listener separately to allow Stream to be Send. - // The DisconnectManager contains the non-Send AudioObjectPropertyListener. _disconnect_manager: DisconnectManager, } @@ -221,13 +316,15 @@ impl Stream { fn new( inner: StreamInner, error_callback: ErrorCallback, + listen_buffer_size: bool, ) -> Result { let device_id = inner.device_id; let inner_arc = Arc::new(Mutex::new(inner)); let weak_inner = Arc::downgrade(&inner_arc); let error_callback = Arc::new(Mutex::new(error_callback)); - let disconnect_manager = DisconnectManager::new(device_id, weak_inner, error_callback)?; + let disconnect_manager = + DisconnectManager::new(device_id, weak_inner, error_callback, listen_buffer_size)?; Ok(Self { inner: inner_arc, diff --git a/src/lib.rs b/src/lib.rs index 71787ccfc..49ea11d66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,6 +192,7 @@ use std::convert::TryInto; use std::time::Duration; pub mod device_description; +pub mod duplex; mod error; mod host; pub mod platform; diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 116d1b1d0..8493b7a5b 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -522,6 +522,29 @@ macro_rules! impl_platform_host { )* } } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &mut crate::Data, &crate::duplex::DuplexCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + match self.0 { + $( + $(#[cfg($feat)])? + DeviceInner::$HostVariant(ref d) => d + .build_duplex_stream_raw(config, sample_format, data_callback, error_callback, timeout) + .map(StreamInner::$HostVariant) + .map(Stream::from), + )* + } + } } impl crate::traits::HostTrait for Host { diff --git a/src/traits.rs b/src/traits.rs index 0b0ee0e8c..6c29a499e 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -8,6 +8,7 @@ use std::time::Duration; use crate::{ + duplex::{DuplexCallbackInfo, DuplexStreamConfig}, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, PauseStreamError, PlayStreamError, SampleFormat, SizedSample, StreamConfig, @@ -94,10 +95,12 @@ pub trait DeviceTrait { type SupportedInputConfigs: Iterator; /// The iterator type yielding supported output stream formats. type SupportedOutputConfigs: Iterator; - /// The stream type created by [`build_input_stream_raw`] and [`build_output_stream_raw`]. + /// The stream type created by [`build_input_stream_raw`], [`build_output_stream_raw`], + /// and [`build_duplex_stream_raw`]. /// /// [`build_input_stream_raw`]: Self::build_input_stream_raw /// [`build_output_stream_raw`]: Self::build_output_stream_raw + /// [`build_duplex_stream_raw`]: Self::build_duplex_stream_raw type Stream: StreamTrait; /// The human-readable name of the device. @@ -139,6 +142,15 @@ pub trait DeviceTrait { .is_ok_and(|mut iter| iter.next().is_some()) } + /// True if the device supports duplex (simultaneous input and output), otherwise false. + /// + /// Duplex operation requires the device to support both input and output with a shared + /// hardware clock. This is typically true for audio interfaces but may not be available + /// on all devices (e.g., output-only speakers or input-only microphones). + fn supports_duplex(&self) -> bool { + self.supports_input() && self.supports_output() + } + /// An iterator yielding formats that are supported by the backend. /// /// Can return an error if the device is no longer valid (e.g. it has been disconnected). @@ -286,6 +298,67 @@ pub trait DeviceTrait { where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static; + + /// Create a duplex stream with synchronized input and output sharing the same hardware clock. + fn build_duplex_stream( + &self, + config: &DuplexStreamConfig, + mut data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + T: SizedSample, + D: FnMut(&[T], &mut [T], &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + self.build_duplex_stream_raw( + config, + T::FORMAT, + move |input, output, info| { + data_callback( + input + .as_slice() + .expect("host supplied incorrect sample type"), + output + .as_slice_mut() + .expect("host supplied incorrect sample type"), + info, + ) + }, + error_callback, + timeout, + ) + } + + /// Create a dynamically typed duplex stream. + /// + /// This method allows working with sample data as raw bytes, useful when the sample + /// format is determined at runtime. For compile-time known formats, prefer + /// [`build_duplex_stream`](Self::build_duplex_stream). + /// + /// # Parameters + /// + /// * `config` - The duplex stream configuration specifying channels, sample rate, and buffer size. + /// * `sample_format` - The sample format of the audio data. + /// * `data_callback` - Called periodically with synchronized input and output buffers as [`Data`]. + /// * `error_callback` - Called when a stream error occurs (e.g., device disconnected). + /// * `timeout` - Optional timeout for backend operations. `None` indicates blocking behavior, + /// `Some(duration)` sets a maximum wait time. Not all backends support timeouts. + fn build_duplex_stream_raw( + &self, + _config: &DuplexStreamConfig, + _sample_format: SampleFormat, + _data_callback: D, + _error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Err(BuildStreamError::StreamConfigNotSupported) + } } /// A stream created from [`Device`](DeviceTrait), with methods to control playback.