Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`.
- **PipeWire**: New host for Linux and some BSDs using the PipeWire API.
- **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API.
- `DeviceTrait::build_duplex_stream` and `build_duplex_stream_raw` for synchronized input/output.
- `duplex` module with `DuplexStreamConfig` and `DuplexCallbackInfo` types.
- **CoreAudio**: Duplex stream support with hardware-synchronized input/output.
- Example `duplex_feedback` demonstrating duplex stream usage.

### Changed

Expand Down Expand Up @@ -91,6 +95,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **POTENTIALLY BREAKING**: `DeviceTrait` now includes `build_duplex_stream()` and `build_duplex_stream_raw()` methods. The default implementation returns `StreamConfigNotSupported`, so external implementations are compatible without changes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This landed in the wrong section, probably due to rebasing.

You don't need to include "POTENTIALLY BREAKING" - SemVer will be bumped and this is listed under "Changed" so that's good enough for me.

- Bump overall MSRV to 1.78.
- **ALSA**: Update `alsa` dependency to 0.11.
- **ALSA**: Bump MSRV to 1.82.
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ name = "record_wav"
[[example]]
name = "synth_tones"

[[example]]
name = "duplex_feedback"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This library currently supports the following:
- Enumerate known supported input and output stream formats for a device.
- Get the current default input and output stream formats for a device.
- Build and run input and output PCM streams on a chosen device with a given stream format.
- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization.

Currently, supported platforms include:

Expand Down Expand Up @@ -174,7 +175,7 @@ If you are unable to build the library:

## Examples

CPAL comes with several examples in `examples/`.
CPAL comes with several examples in `examples/`, including `duplex_feedback` for hardware-synchronized duplex streams.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to be so specific.


Run an example with:
```bash
Expand Down
99 changes: 99 additions & 0 deletions examples/duplex_feedback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Duplex feedback example: feeds the input stream directly into the output.

#[cfg(target_os = "macos")]
mod imp {
use clap::Parser;
use cpal::duplex::DuplexStreamConfig;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{BufferSize, ChannelCount, FrameCount, Sample, SampleRate};

#[derive(Parser, Debug)]
#[command(version, about = "CPAL duplex feedback example", long_about = None)]
struct Opt {
/// The audio device to use (must support duplex operation)
#[arg(short, long, value_name = "DEVICE")]
device: Option<String>,

/// Number of input channels
#[arg(long, value_name = "CHANNELS", default_value_t = 2)]
input_channels: ChannelCount,

/// Number of output channels
#[arg(long, value_name = "CHANNELS", default_value_t = 2)]
output_channels: ChannelCount,

/// Sample rate in Hz
#[arg(short, long, value_name = "RATE", default_value_t = 48000)]
sample_rate: SampleRate,

/// Buffer size in frames (omit for device default)
#[arg(short, long, value_name = "FRAMES")]
buffer_size: Option<FrameCount>,
}

pub fn run() -> anyhow::Result<()> {
let opt = Opt::parse();
let host = cpal::default_host();

let device = match opt.device {
Some(device_id_str) => {
let device_id = device_id_str.parse().expect("failed to parse device id");
host.device_by_id(&device_id)
.expect(&format!("failed to find device with id: {}", device_id_str))
}
None => host
.default_output_device()
.expect("no default output device"),
};

println!("Using device: \"{}\"", device.description()?.name());

let config = DuplexStreamConfig {
input_channels: opt.input_channels,
output_channels: opt.output_channels,
sample_rate: opt.sample_rate,
buffer_size: opt
.buffer_size
.map(|s| BufferSize::Fixed(s))
.unwrap_or(BufferSize::Default),
};

println!("Building duplex stream with config: {config:?}");

let stream = device.build_duplex_stream::<f32, _, _>(
&config,
move |input, output, _info| {
output.fill(Sample::EQUILIBRIUM);
let copy_len = input.len().min(output.len());
output[..copy_len].copy_from_slice(&input[..copy_len]);
},
|err| eprintln!("Stream error: {err}"),
None,
)?;

println!("Successfully built duplex stream.");
println!(
"Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {:?} frames",
opt.input_channels, opt.output_channels, opt.sample_rate, opt.buffer_size
);

println!("Starting duplex stream...");
stream.play()?;

println!("Playing for 10 seconds... (speak into your microphone)");
std::thread::sleep(std::time::Duration::from_secs(10));

println!("Done!");
Ok(())
}
}

fn main() {
#[cfg(target_os = "macos")]
imp::run().unwrap();

#[cfg(not(target_os = "macos"))]
{
eprintln!("Duplex streams are not supported on this platform.");
}
}
36 changes: 36 additions & 0 deletions src/duplex.rs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module should probably have some short Rustdoc, as it's private. Compare with InputCallbackInfo/OutputCallbackInfo for tone and verbosity.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::{ChannelCount, InputStreamTimestamp, OutputStreamTimestamp, SampleRate};

// Timing information for a duplex callback, combining input and output timestamps.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct DuplexCallbackInfo {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these and other public types, please ensure that they have public Rustdoc.

input_timestamp: InputStreamTimestamp,
output_timestamp: OutputStreamTimestamp,
}

impl DuplexCallbackInfo {
pub fn new(
input_timestamp: InputStreamTimestamp,
output_timestamp: OutputStreamTimestamp,
) -> Self {
Self {
input_timestamp,
output_timestamp,
}
}

pub fn input_timestamp(&self) -> InputStreamTimestamp {
self.input_timestamp
}

pub fn output_timestamp(&self) -> OutputStreamTimestamp {
self.output_timestamp
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct DuplexStreamConfig {
pub input_channels: ChannelCount,
pub output_channels: ChannelCount,
pub sample_rate: SampleRate,
pub buffer_size: crate::BufferSize,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: may be imported for consistency with the other types.

}
97 changes: 83 additions & 14 deletions src/host/coreaudio/macos/device.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::OSStatus;
use super::Stream;
use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant};
use crate::duplex::DuplexCallbackInfo;
use crate::host::coreaudio::macos::loopback::LoopbackDevice;
use crate::host::coreaudio::macos::StreamInner;
use crate::traits::DeviceTrait;
use crate::StreamInstant;
use crate::{
BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
DefaultStreamConfigError, DeviceId, DeviceIdError, DeviceNameError, InputCallbackInfo,
Expand Down Expand Up @@ -34,6 +36,7 @@ use objc2_core_audio_types::{
};
use objc2_core_foundation::CFString;
use objc2_core_foundation::Type;
use std::mem::ManuallyDrop;

pub use super::enumerate::{
default_input_device, default_output_device, SupportedInputConfigs, SupportedOutputConfigs,
Expand All @@ -49,9 +52,13 @@ use super::invoke_error_callback;
use super::property_listener::AudioObjectPropertyListener;
use coreaudio::audio_unit::macos_helpers::get_device_name;

pub(super) const AUDIO_UNIT_IO_ENABLED: u32 = 1;
/// Value for `kAudioOutputUnitProperty_EnableIO` to disable I/O on an AudioUnit element.
const AUDIO_UNIT_IO_DISABLED: u32 = 0;

/// Attempt to set the device sample rate to the provided rate.
/// Return an error if the requested sample rate is not supported by the device.
fn set_sample_rate(
pub(super) fn set_sample_rate(
audio_device_id: AudioObjectID,
target_sample_rate: SampleRate,
) -> Result<(), BuildStreamError> {
Expand Down Expand Up @@ -214,21 +221,19 @@ fn audio_unit_from_device(device: &Device, input: bool) -> Result<AudioUnit, cor

if input {
// Enable input processing.
let enable_input = 1u32;
audio_unit.set_property(
kAudioOutputUnitProperty_EnableIO,
Scope::Input,
Element::Input,
Some(&enable_input),
Some(&AUDIO_UNIT_IO_ENABLED),
)?;

// Disable output processing.
let disable_output = 0u32;
audio_unit.set_property(
kAudioOutputUnitProperty_EnableIO,
Scope::Output,
Element::Output,
Some(&disable_output),
Some(&AUDIO_UNIT_IO_DISABLED),
)?;
}

Expand Down Expand Up @@ -260,6 +265,48 @@ fn get_io_buffer_frame_size_range(
})
}

pub(super) fn estimate_capture_instant<E>(
callback_instant: StreamInstant,
delay: Duration,
error_callback: &Mutex<E>,
) -> StreamInstant
where
E: FnMut(StreamError) + Send,
{
callback_instant.sub(delay).unwrap_or_else(|| {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm soon going to merge #1139 which will change the StreamInstant API somewhat. Here's what you might then replace it with on CoreAudio: https://github.com/RustAudio/cpal/pull/1139/changes#diff-8e684b369b47b2adec5fdeb79b478a2533366a37c684eb9a1e686abed4939bc6

invoke_error_callback(
error_callback,
StreamError::BackendSpecific {
err: BackendSpecificError {
description: "Timestamp underflow calculating capture time".into(),
},
},
);
callback_instant
})
}

pub(super) fn estimate_playback_instant<E>(
callback_instant: StreamInstant,
delay: Duration,
error_callback: &Mutex<E>,
) -> StreamInstant
where
E: FnMut(StreamError) + Send,
{
callback_instant.add(delay).unwrap_or_else(|| {
invoke_error_callback(
error_callback,
StreamError::BackendSpecific {
err: BackendSpecificError {
description: "Timestamp overflow calculating playback time".into(),
},
},
);
callback_instant
})
}

impl DeviceTrait for Device {
type SupportedInputConfigs = SupportedInputConfigs;
type SupportedOutputConfigs = SupportedOutputConfigs;
Expand Down Expand Up @@ -336,6 +383,28 @@ impl DeviceTrait for Device {
timeout,
)
}

fn build_duplex_stream_raw<D, E>(
&self,
config: &crate::duplex::DuplexStreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
_timeout: Option<Duration>,
) -> Result<Self::Stream, BuildStreamError>
where
D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
Device::build_duplex_stream_raw(
self,
config,
sample_format,
data_callback,
error_callback,
_timeout,
)
}
}

#[derive(Clone, Eq, Hash, PartialEq)]
Expand Down Expand Up @@ -796,9 +865,7 @@ impl Device {
let latency_frames =
device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames;
let delay = frames_to_duration(latency_frames, sample_rate);
let capture = callback
.sub(delay)
.expect("`capture` occurs before origin of alsa `StreamInstant`");
let capture = estimate_capture_instant(callback, delay, &error_callback);
let timestamp = crate::InputStreamTimestamp { callback, capture };

let info = InputCallbackInfo { timestamp };
Expand All @@ -819,11 +886,13 @@ impl Device {
let stream = Stream::new(
StreamInner {
playing: true,
audio_unit,
audio_unit: ManuallyDrop::new(audio_unit),
device_id: self.audio_device_id,
_loopback_device: loopback_aggregate,
duplex_callback_ptr: None,
},
error_callback_for_stream,
false,
)?;

stream
Expand Down Expand Up @@ -896,9 +965,7 @@ impl Device {
let latency_frames =
device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames;
let delay = frames_to_duration(latency_frames, sample_rate);
let playback = callback
.add(delay)
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
let playback = estimate_playback_instant(callback, delay, &error_callback);
let timestamp = crate::OutputStreamTimestamp { callback, playback };

let info = OutputCallbackInfo { timestamp };
Expand All @@ -919,11 +986,13 @@ impl Device {
let stream = Stream::new(
StreamInner {
playing: true,
audio_unit,
audio_unit: ManuallyDrop::new(audio_unit),
device_id: self.audio_device_id,
_loopback_device: None,
duplex_callback_ptr: None,
},
error_callback_for_stream,
false,
)?;

stream
Expand Down Expand Up @@ -1018,7 +1087,7 @@ fn setup_callback_vars(
///
/// Buffer frame size is a device-level property that always uses Scope::Global + Element::Output,
/// regardless of whether the audio unit is configured for input or output streams.
pub(crate) fn get_device_buffer_frame_size(
pub(super) fn get_device_buffer_frame_size(
audio_unit: &AudioUnit,
) -> Result<usize, coreaudio::Error> {
// Device-level property: always use Scope::Global + Element::Output
Expand Down
Loading
Loading