From 4f513beeb30336bc1f9e61d1b8c082d702c593c9 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 29 Mar 2026 22:43:41 +0200 Subject: [PATCH 01/17] feat: add StreamTrait::now() and overhaul StreamInstant API - Adds now() to StreamTrait so callers can query the stream clock outside the audio callback on the same time base as callback/capture/playback timestamps. - StreamInstant is reworked to mirror std::time::Instant with u64 storage (all stream clocks are monotonic and non-negative) and similar functions parameters, and return types. Closes #472 --- CHANGELOG.md | 7 + Cargo.toml | 3 + UPGRADING.md | 74 ++++++++ examples/custom.rs | 20 +- src/host/aaudio/convert.rs | 19 +- src/host/aaudio/mod.rs | 14 +- src/host/alsa/mod.rs | 32 ++-- src/host/asio/mod.rs | 8 + src/host/asio/stream.rs | 14 +- src/host/audioworklet/mod.rs | 16 +- src/host/coreaudio/ios/mod.rs | 13 +- src/host/coreaudio/macos/device.rs | 8 +- src/host/coreaudio/macos/mod.rs | 5 + src/host/coreaudio/mod.rs | 4 +- src/host/custom/mod.rs | 9 + src/host/emscripten/mod.rs | 8 +- src/host/jack/stream.rs | 35 ++-- src/host/pipewire/stream.rs | 70 ++++--- src/host/pulseaudio/stream.rs | 51 +++-- src/host/wasapi/stream.rs | 28 ++- src/host/webaudio/mod.rs | 4 + src/lib.rs | 289 ++++++++++++++++++++++------- src/platform/mod.rs | 11 ++ src/traits.rs | 14 +- 24 files changed, 527 insertions(+), 229 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c78c8a3a1..ffe9f7e40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `StreamConfig` now implements `Copy`. - `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback. - `device_by_id` is now dispatched to each backend's implementation, allowing to override it. +- `StreamTrait::now()` to query the current instant on the stream's clock. +- `StreamInstant` API changed and extended to mirror `std::time::Instant`/`Duration`. See + [UPGRADING.md](UPGRADING.md) for migration details. - **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`. - **PipeWire**: New host for Linux and some BSDs using the PipeWire API. - **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API. @@ -48,6 +51,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **WebAudio**: Timestamps now include base and output latency. - **WebAudio**: Initial buffer scheduling offset now scales with buffer duration. +### Removed + +- Replaced `StreamInstant::add()` and `sub()` by `checked_add()`/`+` and `checked_sub()`/`-`. + ### Fixed - Reintroduce `audio_thread_priority` feature. diff --git a/Cargo.toml b/Cargo.toml index 8e28c9ca5..118b509da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,12 +85,14 @@ clap = { version = ">=4.0, <=4.5.57", features = ["derive"] } # When updating this, also update the "windows-version" matrix in the CI workflow. [target.'cfg(target_os = "windows")'.dependencies] windows = { version = ">=0.59, <=0.62", features = [ + "Win32_Media", "Win32_Media_Audio", "Win32_Foundation", "Win32_Devices_Properties", "Win32_Media_KernelStreaming", "Win32_System_Com_StructuredStorage", "Win32_System_Threading", + "Win32_System_Performance", "Win32_Security", "Win32_System_SystemServices", "Win32_System_Variant", @@ -183,6 +185,7 @@ ndk = { version = "0.9", default-features = false, features = [ ] } ndk-context = "0.1" jni = "0.21" +libc = "0.2" num-derive = "0.4" num-traits = "0.2" diff --git a/UPGRADING.md b/UPGRADING.md index 4ece2f2f7..ccd1c3b0e 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -8,6 +8,13 @@ This guide covers breaking changes requiring code updates. See [CHANGELOG.md](CH - [ ] Optionally handle the new `DeviceBusy` variant for retryable device errors - [ ] Change `build_*_stream` call sites to pass `StreamConfig` by value (drop the `&`) - [ ] For custom hosts, change `DeviceTrait` implementations to accept `StreamConfig` by value. +- [ ] Remove `instant.duration_since(e)` unwraps; it now returns `Duration` (saturating). +- [ ] Change `instant.add(d)` to `instant.checked_add(d)` (or use `instant + d`). +- [ ] Change `instant.sub(d)` to `instant.checked_sub(d)` (or use `instant - d`). +- [ ] Update `StreamInstant::new(secs, nanos)` call sites: `secs` is now `u64`. +- [ ] Update `StreamInstant::from_nanos(nanos)` call sites: `nanos` is now `u64`. +- [ ] Remove any uses of `StreamInstant::as_nanos()` — see migration note below. +- [ ] Update `duration_since` call sites to pass by value (drop the `&`). ## 1. Error enums are now `#[non_exhaustive]` @@ -61,6 +68,73 @@ let stream = device.build_output_stream(config, data_fn, err_fn, None)?; If you implement `DeviceTrait` on your own type (via the `custom` feature), update your `build_input_stream_raw` and `build_output_stream_raw` signatures from `config: &StreamConfig` to `config: StreamConfig`. Any `config.clone()` calls before `move` closures can also be removed. +## 4. `StreamInstant` API overhaul + +The `StreamInstant` API has been aligned with `std::time::Instant` and `std::time::Duration`. + +### `duration_since` now returns `Duration` (saturating) + +**What changed:** `duration_since` now returns `Duration` directly, saturating to `Duration::ZERO` +when the argument is later than `self`, instead of returning `Option`. + +```rust +// Before (v0.17): returned Option, argument by reference +if let Some(d) = callback.duration_since(&start) { + println!("elapsed: {d:?}"); +} + +// After (v0.18): returns Duration (saturating), argument by value +let d = callback.duration_since(start); +println!("elapsed: {d:?}"); + +// For the previous Option-returning behaviour, use checked_duration_since: +if let Some(d) = callback.checked_duration_since(start) { + println!("elapsed: {d:?}"); +} +``` + +**Why:** Mirrors the behavior of `std::time::Instant::duration_since` as of Rust standard library. + +### `add` / `sub` renamed to `checked_add` / `checked_sub`; operator impls added + +**What changed:** The `add` and `sub` methods (which returned `Option`) are replaced by +`checked_add` / `checked_sub` with the same semantics. `+`, `-`, `+=`, and `-=` operator impls +are also added. + +```rust +// Before (v0.17) +let future = instant.add(Duration::from_millis(10)).expect("overflow"); +let past = instant.sub(Duration::from_millis(10)).expect("underflow"); + +// After (v0.18): explicit checked form (same semantics): +let future = instant.checked_add(Duration::from_millis(10)).expect("overflow"); +let past = instant.checked_sub(Duration::from_millis(10)).expect("underflow"); + +// Or use the operator (panics on overflow, like std::time::Instant): +let future = instant + Duration::from_millis(10); +let past = instant - Duration::from_millis(10); + +// Subtract two instants to get a Duration (saturates to zero): +let elapsed: Duration = later - earlier; +``` + +**Why:** Aligns the API with `std::time::Instant`, making `StreamInstant` more idiomatic. + +### `new` and `from_nanos` take unsigned integers + +**What changed:** The `secs` parameter of `StreamInstant::new` and the `nanos` parameter of +`StreamInstant::from_nanos` are now `u64` instead of `i64`. + +```rust +// Before (v0.17): negative seconds were accepted +StreamInstant::new(-1_i64, 0); + +// After (v0.18): all stream clocks are non-negative +StreamInstant::new(0_u64, 0); +``` + +**Why:** All audio host clocks are positive and monotonic; they are never negative. + --- # Upgrading from v0.16 to v0.17 diff --git a/examples/custom.rs b/examples/custom.rs index baf405661..5817517a3 100644 --- a/examples/custom.rs +++ b/examples/custom.rs @@ -2,6 +2,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +use std::time::Instant; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, @@ -19,7 +20,10 @@ struct MyDevice; // Only Send+Sync is needed struct MyStream { controls: Arc, - // option is needed since joining a thread takes ownership, + // The instant the audio thread was started; shared with now() so that + // callback timestamps and now() are on the same time base. + start: Instant, + // Option is needed since joining a thread takes ownership, // and we want to do that on drop (gives us &mut self, not self) handle: Option>, } @@ -138,9 +142,9 @@ impl DeviceTrait for MyDevice { pause: AtomicBool::new(true), // streams are expected to start out paused by default }); + let start = Instant::now(); let thread_controls = controls.clone(); let handle = std::thread::spawn(move || { - let start = std::time::Instant::now(); let mut buffer = [0.0_f32; 4096]; while !thread_controls.exit.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_secs_f32( @@ -161,7 +165,7 @@ impl DeviceTrait for MyDevice { ) }; - let duration = std::time::Instant::now().duration_since(start); + let duration = Instant::now().duration_since(start); let secs = duration.as_nanos() / 1_000_000_000; let subsec_nanos = duration.as_nanos() - secs * 1_000_000_000; let stream_instant = cpal::StreamInstant::new(secs as _, subsec_nanos as _); @@ -178,6 +182,7 @@ impl DeviceTrait for MyDevice { Ok(MyStream { controls, + start, handle: Some(handle), }) } @@ -193,6 +198,11 @@ impl StreamTrait for MyStream { self.controls.pause.store(true, Ordering::Relaxed); Ok(()) } + + fn now(&self) -> cpal::StreamInstant { + let elapsed = self.start.elapsed(); + cpal::StreamInstant::from_nanos(elapsed.as_nanos() as u64) + } } // streams are expected to stop when dropped @@ -315,9 +325,7 @@ pub fn make_stream( config, move |output: &mut [f32], _: &cpal::OutputCallbackInfo| { // for 0-1s play sine, 1-2s play square, 2-3s play saw, 3-4s play triangle_wave - let time_since_start = std::time::Instant::now() - .duration_since(time_at_start) - .as_secs_f32(); + let time_since_start = Instant::now().duration_since(time_at_start).as_secs_f32(); if time_since_start < 1.0 { oscillator.set_waveform(Waveform::Sine); } else if time_since_start < 2.0 { diff --git a/src/host/aaudio/convert.rs b/src/host/aaudio/convert.rs index 7a085fa13..65669212c 100644 --- a/src/host/aaudio/convert.rs +++ b/src/host/aaudio/convert.rs @@ -1,5 +1,4 @@ -use std::convert::TryInto; -use std::time::Duration; +//! Time-conversion helpers for the AAudio backend. extern crate ndk; @@ -8,13 +7,17 @@ use crate::{ StreamInstant, }; -pub fn to_stream_instant(duration: Duration) -> StreamInstant { - StreamInstant::new( - duration.as_secs().try_into().unwrap(), - duration.subsec_nanos(), - ) +/// Returns a [`StreamInstant`] for the current moment. +pub fn now_stream_instant() -> StreamInstant { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + StreamInstant::from_nanos(ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64) } +/// Returns the [`StreamInstant`] of the most recent audio frame transferred by `stream`. pub fn stream_instant(stream: &ndk::audio::AudioStream) -> StreamInstant { let ts = stream .timestamp(ndk::audio::Clockid::Monotonic) @@ -22,7 +25,7 @@ pub fn stream_instant(stream: &ndk::audio::AudioStream) -> StreamInstant { frame_position: 0, time_nanoseconds: 0, }); - to_stream_instant(Duration::from_nanos(ts.time_nanoseconds as u64)) + StreamInstant::from_nanos(ts.time_nanoseconds as u64) } impl From for StreamError { diff --git a/src/host/aaudio/mod.rs b/src/host/aaudio/mod.rs index 80abe2795..9a2394f2b 100644 --- a/src/host/aaudio/mod.rs +++ b/src/host/aaudio/mod.rs @@ -6,12 +6,12 @@ use std::cmp; use std::convert::TryInto; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::IntoIter as VecIntoIter; extern crate ndk; -use convert::{stream_instant, to_stream_instant}; +use convert::{now_stream_instant, stream_instant}; use java_interface::{AudioDeviceInfo, AudioManager}; use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; @@ -316,13 +316,12 @@ where E: FnMut(StreamError) + Send + 'static, { let builder = configure_for_device(builder, device, config); - let created = Instant::now(); let channel_count = config.channels as i32; let stream = builder .data_callback(Box::new(move |stream, data, num_frames| { let cb_info = InputCallbackInfo { timestamp: InputStreamTimestamp { - callback: to_stream_instant(created.elapsed()), + callback: now_stream_instant(), capture: stream_instant(stream), }, }; @@ -366,7 +365,6 @@ where E: FnMut(StreamError) + Send + 'static, { let builder = configure_for_device(builder, device, config); - let created = Instant::now(); let channel_count = config.channels as i32; let tune_dynamically = config.buffer_size == BufferSize::Default; @@ -378,7 +376,7 @@ where // Deliver audio data to user callback let cb_info = OutputCallbackInfo { timestamp: OutputStreamTimestamp { - callback: to_stream_instant(created.elapsed()), + callback: now_stream_instant(), playback: stream_instant(stream), }, }; @@ -719,6 +717,10 @@ impl StreamTrait for Stream { } } + fn now(&self) -> crate::StreamInstant { + now_stream_instant() + } + fn buffer_size(&self) -> Option { let stream = self.inner.lock().ok()?; diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index a2c801149..5c03fc566 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1064,12 +1064,7 @@ fn process_input( stream_timestamp_fallback(stream.creation_instant) }?; let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); - let capture = callback - .sub(delay_duration) - .ok_or_else(|| BackendSpecificError { - description: "`capture` is earlier than representation supported by `StreamInstant`" - .to_string(), - })?; + let capture = callback - delay_duration; let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); @@ -1098,12 +1093,7 @@ fn process_output( stream_timestamp_fallback(stream.creation_instant) }?; let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); - let playback = callback - .add(delay_duration) - .ok_or_else(|| BackendSpecificError { - description: "`playback` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; + let playback = callback + delay_duration; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = crate::OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); @@ -1156,7 +1146,7 @@ fn stream_timestamp_hardware( ); return Err(BackendSpecificError { description }); } - Ok(crate::StreamInstant::from_nanos(nanos)) + Ok(crate::StreamInstant::from_nanos(nanos as u64)) } // Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. @@ -1168,9 +1158,7 @@ fn stream_timestamp_fallback( ) -> Result { let now = std::time::Instant::now(); let duration = now.duration_since(creation); - crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { - description: "stream duration has exceeded `StreamInstant` representation".to_string(), - }) + Ok(crate::StreamInstant::from_nanos(duration.as_nanos() as u64)) } // Adapted from `timestamp2ns` here: @@ -1284,6 +1272,18 @@ impl StreamTrait for Stream { self.inner.channel.pause(true).ok(); Ok(()) } + fn now(&self) -> crate::StreamInstant { + if self.inner.use_hw_timestamps { + if let Ok(status) = self.inner.channel.status() { + if let Ok(instant) = stream_timestamp_hardware(&status) { + return instant; + } + } + } + stream_timestamp_fallback(self.inner.creation_instant) + .expect("stream duration exceeded `StreamInstant` range") + } + fn buffer_size(&self) -> Option { Some(self.inner.period_frames as FrameCount) } diff --git a/src/host/asio/mod.rs b/src/host/asio/mod.rs index 4cb4af354..aafdf4225 100644 --- a/src/host/asio/mod.rs +++ b/src/host/asio/mod.rs @@ -156,6 +156,14 @@ impl StreamTrait for Stream { Stream::pause(self) } + fn now(&self) -> crate::StreamInstant { + // `ASIOTimeInfo::systemTime` is specified by the ASIO SDK as nanoseconds + // derived from `timeGetTime()`, so calling it here gives a value on the + // same clock as the `system_time` field delivered to every callback. + let ms = unsafe { windows::Win32::Media::timeGetTime() }; + crate::StreamInstant::from_nanos(ms as i64 * 1_000_000) + } + fn buffer_size(&self) -> Option { Stream::buffer_size(self) } diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 04cc1e8d1..230dcabb8 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -1136,12 +1136,9 @@ unsafe fn apply_output_callback_to_data( interleaved.len(), sample_format, ); - let callback = crate::StreamInstant::from_nanos_i128(asio_info.system_time as i128) - .expect("`system_time` out of range of `StreamInstant` representation"); + let callback = crate::StreamInstant::from_nanos(asio_info.system_time); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + delay; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); @@ -1164,12 +1161,9 @@ unsafe fn apply_input_callback_to_data( interleaved.len(), format, ); - let callback = crate::StreamInstant::from_nanos_i128(asio_info.system_time as i128) - .expect("`system_time` out of range of `StreamInstant` representation"); + let callback = crate::StreamInstant::from_nanos(asio_info.system_time); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = callback - delay; let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; data_callback(&data, &info); diff --git a/src/host/audioworklet/mod.rs b/src/host/audioworklet/mod.rs index 3972d385c..46fa6345f 100644 --- a/src/host/audioworklet/mod.rs +++ b/src/host/audioworklet/mod.rs @@ -243,7 +243,11 @@ impl DeviceTrait for Device { .unwrap_or(0.0); let total_output_latency_secs = { let sum = base_latency_secs + output_latency_secs; - if sum.is_finite() { sum.max(0.0) } else { 0.0 } + if sum.is_finite() { + sum.max(0.0) + } else { + 0.0 + } }; options.set_processor_options(Some(&js_sys::Array::of3( @@ -259,10 +263,8 @@ impl DeviceTrait for Device { let callback = crate::StreamInstant::from_secs_f64(now); let buffer_duration = frames_to_duration(frame_size as _, sample_rate); let playback = callback - .add(buffer_duration + Duration::from_secs_f64(total_output_latency_secs)) - .expect( - "`playback` occurs beyond representation supported by `StreamInstant`", - ); + + (buffer_duration + + Duration::from_secs_f64(total_output_latency_secs)); let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; (data_callback)(&mut data, &info); @@ -319,6 +321,10 @@ impl StreamTrait for Stream { } } } + + fn now(&self) -> crate::StreamInstant { + crate::StreamInstant::from_secs_f64(self.audio_context.current_time()) + } } impl Drop for Stream { diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index 750b5d0d3..2ead1160d 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -279,6 +279,11 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> crate::StreamInstant { + let m_host_time = unsafe { mach2::mach_time::mach_absolute_time() }; + host_time_to_stream_instant(m_host_time).expect("mach_timebase_info failed") + } + fn buffer_size(&self) -> Option { Some(get_device_buffer_frames() as crate::FrameCount) } @@ -501,9 +506,7 @@ where } }); let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = callback - delay; let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -552,9 +555,7 @@ where } }); let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + delay; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 7267d9d50..f08f47103 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -796,9 +796,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = callback - delay; let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -896,9 +894,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + delay; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index d94525037..b0bb9499d 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -263,6 +263,11 @@ impl StreamTrait for Stream { stream.pause() } + fn now(&self) -> crate::StreamInstant { + let m_host_time = unsafe { mach2::mach_time::mach_absolute_time() }; + host_time_to_stream_instant(m_host_time).expect("mach_timebase_info failed") + } + fn buffer_size(&self) -> Option { let stream = self.inner.lock().ok()?; diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index d982ca721..579f24e27 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -81,9 +81,7 @@ fn host_time_to_stream_instant( let res = unsafe { mach2::mach_time::mach_timebase_info(&mut info) }; check_os_status(res)?; let nanos = m_host_time as u128 * info.numer as u128 / info.denom as u128; - crate::StreamInstant::from_nanos_i128(nanos as i128).ok_or(BackendSpecificError { - description: "host time out of range of `StreamInstant` representation".to_string(), - }) + Ok(crate::StreamInstant::from_nanos(nanos as u64)) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/custom/mod.rs b/src/host/custom/mod.rs index 81f7d3018..1a2ac4b02 100644 --- a/src/host/custom/mod.rs +++ b/src/host/custom/mod.rs @@ -177,6 +177,7 @@ trait DeviceErased: Send + Sync { trait StreamErased: Send + Sync { fn play(&self) -> Result<(), PlayStreamError>; fn pause(&self) -> Result<(), PauseStreamError>; + fn now(&self) -> crate::StreamInstant; } fn device_to_erased(d: impl DeviceErased + 'static) -> Device { @@ -312,6 +313,10 @@ where fn pause(&self) -> Result<(), PauseStreamError> { ::pause(self) } + + fn now(&self) -> crate::StreamInstant { + ::now(self) + } } // implementations of HostTrait, DeviceTrait, and StreamTrait for custom versions @@ -435,4 +440,8 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), PauseStreamError> { self.0.pause() } + + fn now(&self) -> crate::StreamInstant { + self.0.now() + } } diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index c43119996..58d9d2503 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -278,6 +278,10 @@ impl StreamTrait for Stream { }); Ok(()) } + + fn now(&self) -> crate::StreamInstant { + crate::StreamInstant::from_secs_f64(self.audio_ctxt.current_time()) + } } fn audio_callback_fn( @@ -305,9 +309,7 @@ where // supported by firefox (2020-04-28). // let latency_secs: f64 = audio_ctxt.outputLatency.try_into().unwrap(); let buffer_duration = frames_to_duration(len, sample_rate as usize); - let playback = callback - .add(buffer_duration) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + buffer_duration; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index dab1cb995..d33a048ed 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -221,6 +221,10 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> crate::StreamInstant { + micros_to_stream_instant(self.async_client.as_client().time()) + } + fn buffer_size(&self) -> Option { Some(self.async_client.as_client().buffer_size() as crate::FrameCount) } @@ -243,7 +247,6 @@ struct LocalProcessHandler { temp_input_buffer: Vec, temp_output_buffer: Vec, playing: Arc, - creation_timestamp: std::time::Instant, /// This should not be called on `process`, only on `buffer_size` because it can block. error_callback_ptr: ErrorCallbackPtr, } @@ -274,7 +277,6 @@ impl LocalProcessHandler { temp_input_buffer, temp_output_buffer, playing, - creation_timestamp: std::time::Instant::now(), error_callback_ptr, } } @@ -288,7 +290,11 @@ fn temp_buffer_to_data(temp_input_buffer: &mut [f32], total_buffer_size: usize) } impl jack::ProcessHandler for LocalProcessHandler { - fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { + fn process( + &mut self, + client: &jack::Client, + process_scope: &jack::ProcessScope, + ) -> jack::Control { if !self.playing.load(Ordering::SeqCst) { return jack::Control::Continue; } @@ -301,20 +307,18 @@ impl jack::ProcessHandler for LocalProcessHandler { let (current_start_usecs, next_usecs_opt) = match process_scope.cycle_times() { Ok(times) => (times.current_usecs, Some(times.next_usecs)), Err(_) => { - // jack was unable to get the current time information - // Fall back to using Instants - let now = std::time::Instant::now(); - let duration = now.duration_since(self.creation_timestamp); - (duration.as_micros() as u64, None) + // JACK was unable to get the current time information. + // Fall back to jack_get_time(), which is the same clock source + // used by now() and cycle_times(), so the epoch stays consistent. + (client.time(), None) } }; let start_cycle_instant = micros_to_stream_instant(current_start_usecs); let start_callback_instant = start_cycle_instant - .add(frames_to_duration( + + frames_to_duration( process_scope.frames_since_cycle_start() as usize, self.sample_rate, - )) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + ); if let Some(input_callback) = &mut self.input_data_callback { // Let's get the data from the input ports and run the callback @@ -357,9 +361,9 @@ impl jack::ProcessHandler for LocalProcessHandler { // exact instant at which the last sample written here will be consumed by the device. let playback = match next_usecs_opt { Some(next_usecs) => micros_to_stream_instant(next_usecs), - None => start_cycle_instant - .add(frames_to_duration(current_frame_count, self.sample_rate)) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"), + None => { + start_cycle_instant + frames_to_duration(current_frame_count, self.sample_rate) + } }; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = crate::OutputCallbackInfo { timestamp }; @@ -400,8 +404,7 @@ impl jack::ProcessHandler for LocalProcessHandler { } fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { - crate::StreamInstant::from_nanos_i128(micros as i128 * 1_000) - .expect("`micros` out of range of `StreamInstant` representation") + crate::StreamInstant::from_nanos((micros as u128 * 1_000) as u64) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 9f41bd4c4..1170708c5 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -4,9 +4,10 @@ use std::{ Arc, }, thread::JoinHandle, - time::Instant, }; +use libc; + use crate::{ host::fill_with_equilibrium, traits::StreamTrait, BackendSpecificError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, StreamConfig, StreamError, StreamInstant, @@ -68,6 +69,10 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> crate::StreamInstant { + monotonic_stream_instant().expect("clock_gettime failed") + } + fn buffer_size(&self) -> Option { match self.last_quantum.load(Ordering::Relaxed) { 0 => None, @@ -142,7 +147,6 @@ pub struct UserData { error_callback: E, sample_format: SampleFormat, format: pw::spa::param::audio::AudioInfoRaw, - created_instance: Instant, last_quantum: Arc, } impl UserData @@ -211,18 +215,14 @@ where self.last_quantum.store(frames as u64, Ordering::Relaxed); let (callback, capture) = match pw_stream_time(stream) { Some(PwTime { now_ns, delay_ns }) => ( - StreamInstant::from_nanos(now_ns), - StreamInstant::from_nanos(now_ns - delay_ns), + StreamInstant::from_nanos(now_ns as u64), + StreamInstant::from_nanos((now_ns - delay_ns.max(0)) as u64), ), None => { - let cb = stream_timestamp_fallback(self.created_instance)?; - let pl = cb - .sub(frames_to_duration(frames, self.format.rate())) - .ok_or_else(|| BackendSpecificError { - description: - "`capture` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; + let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { + description: "clock_gettime failed".to_owned(), + })?; + let pl = cb - frames_to_duration(frames, self.format.rate()); (cb, pl) } }; @@ -246,18 +246,14 @@ where self.last_quantum.store(frames as u64, Ordering::Relaxed); let (callback, playback) = match pw_stream_time(stream) { Some(PwTime { now_ns, delay_ns }) => ( - StreamInstant::from_nanos(now_ns), - StreamInstant::from_nanos(now_ns + delay_ns), + StreamInstant::from_nanos(now_ns as u64), + StreamInstant::from_nanos((now_ns + delay_ns.max(0)) as u64), ), None => { - let cb = stream_timestamp_fallback(self.created_instance)?; - let pl = cb - .add(frames_to_duration(frames, self.format.rate())) - .ok_or_else(|| BackendSpecificError { - description: - "`playback` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; + let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { + description: "clock_gettime failed".to_owned(), + })?; + let pl = cb + frames_to_duration(frames, self.format.rate()); (cb, pl) } }; @@ -274,18 +270,22 @@ pub struct StreamData { pub context: ContextRc, } -// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. -// -// This ensures positive values that are compatible with our `StreamInstant` representation. -#[inline] -fn stream_timestamp_fallback( - creation: std::time::Instant, -) -> Result { - let now = std::time::Instant::now(); - let duration = now.duration_since(creation); - StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { - description: "stream duration has exceeded `StreamInstant` representation".to_string(), - }) +/// Read `clock_gettime` and return it as a [`StreamInstant`]. +/// +/// This is the same clock used by `pw_stream_get_time_n` (`pw_time.now`), so values +/// returned here are directly comparable with the `callback`/`capture`/`playback` +/// instants delivered to the data callback. +fn monotonic_stream_instant() -> Option { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + if rc == 0 { + Some(StreamInstant::new(ts.tv_sec as u64, ts.tv_nsec as u32)) + } else { + None + } } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. @@ -319,7 +319,6 @@ where error_callback, sample_format, format: Default::default(), - created_instance: Instant::now(), last_quantum, }; let channels = config.channels as _; @@ -471,7 +470,6 @@ where error_callback, sample_format, format: Default::default(), - created_instance: Instant::now(), last_quantum, }; diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 0d63b3c20..f55c21968 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -18,17 +18,17 @@ use crate::{ const LATENCY_POLL_INTERVAL: Duration = Duration::from_millis(5); pub enum Stream { - Playback(pulseaudio::PlaybackStream), - Record(pulseaudio::RecordStream), + Playback(pulseaudio::PlaybackStream, Instant), + Record(pulseaudio::RecordStream, Instant), } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { match self { - Stream::Playback(stream) => { + Stream::Playback(stream, _) => { block_on(stream.uncork()).map_err(Into::::into)?; } - Stream::Record(stream) => { + Stream::Record(stream, _) => { block_on(stream.uncork()).map_err(Into::::into)?; block_on(stream.started()).map_err(Into::::into)?; } @@ -39,21 +39,29 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), crate::PauseStreamError> { let res = match self { - Stream::Playback(stream) => block_on(stream.cork()), - Stream::Record(stream) => block_on(stream.cork()), + Stream::Playback(stream, _) => block_on(stream.cork()), + Stream::Record(stream, _) => block_on(stream.cork()), }; res.map_err(Into::::into)?; Ok(()) } + fn now(&self) -> crate::StreamInstant { + let start = match self { + Stream::Playback(_, start) | Stream::Record(_, start) => *start, + }; + let elapsed = start.elapsed(); + StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()) + } + fn buffer_size(&self) -> Option { let (spec, bytes) = match self { - Stream::Playback(s) => ( + Stream::Playback(s, _) => ( s.sample_spec(), s.buffer_attr().minimum_request_length as usize, ), - Stream::Record(s) => (s.sample_spec(), s.buffer_attr().fragment_size as usize), + Stream::Record(s, _) => (s.sample_spec(), s.buffer_attr().fragment_size as usize), }; let frame_size = spec.channels as usize * spec.format.bytes_per_sample(); if bytes > 0 { @@ -75,8 +83,7 @@ impl Stream { D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - // Use a monotonic clock relative to stream creation for StreamInstants. - let start = std::time::Instant::now(); + let start = Instant::now(); let current_latency_micros = Arc::new(AtomicU64::new(0)); // Microseconds since stream creation at the time of the last latency poll, used @@ -122,14 +129,8 @@ impl Stream { let playback_time = elapsed + Duration::from_micros(latency); let timestamp = OutputStreamTimestamp { - callback: StreamInstant { - secs: elapsed.as_secs() as i64, - nanos: elapsed.subsec_nanos(), - }, - playback: StreamInstant { - secs: playback_time.as_secs() as i64, - nanos: playback_time.subsec_nanos(), - }, + callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()), + playback: StreamInstant::new(playback_time.as_secs(), playback_time.subsec_nanos()), }; // Preemptively fill the buffer with silence in case the user @@ -202,7 +203,7 @@ impl Stream { std::thread::sleep(LATENCY_POLL_INTERVAL); }); - Ok(Self::Playback(stream)) + Ok(Self::Playback(stream, start)) } pub fn new_record( @@ -234,14 +235,8 @@ impl Stream { .unwrap_or_default(); let timestamp = InputStreamTimestamp { - callback: StreamInstant { - secs: elapsed.as_secs() as i64, - nanos: elapsed.subsec_nanos(), - }, - capture: StreamInstant { - secs: capture_time.as_secs() as i64, - nanos: capture_time.subsec_nanos(), - }, + callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()), + capture: StreamInstant::new(capture_time.as_secs(), capture_time.subsec_nanos()), }; let bps = sample_spec.format.bytes_per_sample(); @@ -286,7 +281,7 @@ impl Stream { std::thread::sleep(LATENCY_POLL_INTERVAL); }); - Ok(Self::Record(stream)) + Ok(Self::Record(stream, start)) } } diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 03d694f05..20c4b2942 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -12,6 +12,7 @@ use std::time::Duration; use windows::Win32::Foundation; use windows::Win32::Foundation::WAIT_OBJECT_0; use windows::Win32::Media::Audio; +use windows::Win32::System::Performance; use windows::Win32::System::SystemServices; use windows::Win32::System::Threading; @@ -215,6 +216,19 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> crate::StreamInstant { + let mut counter: i64 = 0; + let mut freq: i64 = 0; + unsafe { + Performance::QueryPerformanceCounter(&mut counter) + .expect("QueryPerformanceCounter failed"); + Performance::QueryPerformanceFrequency(&mut freq) + .expect("QueryPerformanceFrequency failed"); + } + let nanos = (counter as u128 * 1_000_000_000 / freq as u128) as u64; + crate::StreamInstant::from_nanos(nanos) + } + fn buffer_size(&self) -> Option { Some(self.period_frames) } @@ -571,9 +585,8 @@ fn stream_instant(stream: &StreamInner) -> Result)?; }; // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = qpc_position as i128 * 100; - let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos) - .expect("performance counter out of range of `StreamInstant` representation"); + let qpc_nanos = (qpc_position as u128 * 100) as u64; + let instant = crate::StreamInstant::from_nanos(qpc_nanos); Ok(instant) } @@ -587,9 +600,8 @@ fn input_timestamp( buffer_qpc_position: u64, ) -> Result { // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = buffer_qpc_position as i128 * 100; - let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos) - .expect("performance counter out of range of `StreamInstant` representation"); + let qpc_nanos = (buffer_qpc_position as u128 * 100) as u64; + let capture = crate::StreamInstant::from_nanos(qpc_nanos); let callback = stream_instant(stream)?; Ok(crate::InputStreamTimestamp { capture, callback }) } @@ -609,8 +621,6 @@ fn output_timestamp( // `padding` is the number of frames already queued in the endpoint buffer ahead of the // frames we are about to write. Those frames must drain before ours are heard. let padding = stream.max_frames_in_buffer - frames_available; - let playback = callback - .add(frames_to_duration(padding, sample_rate) + stream.stream_latency) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + (frames_to_duration(padding, sample_rate) + stream.stream_latency); Ok(crate::OutputStreamTimestamp { callback, playback }) } diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index fe8f1a457..33fd7c3c2 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -496,6 +496,10 @@ impl StreamTrait for Stream { } } + fn now(&self) -> crate::StreamInstant { + crate::StreamInstant::from_secs_f64(self.ctx.current_time()) + } + fn buffer_size(&self) -> Option { Some(self.buffer_size_frames as crate::FrameCount) } diff --git a/src/lib.rs b/src/lib.rs index 71787ccfc..a89142996 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -475,23 +475,38 @@ pub struct Data { /// 2. The same time source used to generate timestamps for a stream's underlying audio data /// callback. /// -/// `StreamInstant` represents a duration since an unspecified origin point. The origin -/// is guaranteed to occur at or before the stream starts, and remains consistent for the -/// lifetime of that stream. Different streams may have different origins. +/// `StreamInstant` represents a moment on a stream's monotonic clock. Because the underlying clock +/// is monotonic, `StreamInstant` values are always positive and increasing. /// -/// ## Host `StreamInstant` Sources +/// Within a single stream, all instants share the same clock, so arithmetic between them is +/// meaningful. Across different streams, origins are not guaranteed to be shared. On some hosts +/// each stream starts its own independent clock at zero, so subtracting a timestamp from one +/// stream and one from another may produce a meaningless result. /// -/// | Host | Source | -/// | ---- | ------ | -/// | alsa | `snd_pcm_status_get_htstamp` | -/// | asio | `timeGetTime` | -/// | coreaudio | `mach_absolute_time` | -/// | emscripten | `AudioContext.getOutputTimestamp` | -/// | pulseaudio | `std::time::Instant` | -/// | wasapi | `QueryPerformanceCounter` | +/// ## Time sources by host +/// +/// | Host | Time source | +/// | ---- | ----------- | +/// | AAudio | `AAudioStream_getTimestamp(CLOCK_MONOTONIC)` | +/// | ALSA | `snd_pcm_status_get_htstamp()` | +/// | ASIO | `timeGetTime()` | +/// | AudioWorklet | `AudioContext.currentTime` | +/// | CoreAudio | `mach_absolute_time()` | +/// | Emscripten | `AudioContext.currentTime` | +/// | JACK | `jack_get_time()` | +/// | PipeWire | `pw_stream_get_time_n()` | +/// | PulseAudio | `std::time::Instant` | +/// | WASAPI | `QueryPerformanceCounter()` | +/// | WebAudio | `AudioContext.currentTime` | +/// +/// > **Disclaimer:** These system calls might change over time. +/// +/// > **Note:** Mathematical operations like [`add`][StreamInstant::add] may panic if the result +/// > cannot be represented as a `StreamInstant`. Use [`checked_add`][StreamInstant::checked_add] +/// > or [`checked_sub`][StreamInstant::checked_sub] for non-panicking variants. #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct StreamInstant { - secs: i64, + secs: u64, nanos: u32, } @@ -571,11 +586,10 @@ impl SupportedStreamConfig { } impl StreamInstant { - /// The amount of time elapsed from another instant to this one. - /// - /// Returns `None` if `earlier` is later than self. - pub fn duration_since(&self, earlier: &Self) -> Option { - if self < earlier { + /// Returns the amount of time elapsed from `earlier` to `self`, or `None` if `earlier` is + /// later than `self`. + pub fn checked_duration_since(&self, earlier: StreamInstant) -> Option { + if self < &earlier { None } else { (self.as_nanos() - earlier.as_nanos()) @@ -585,59 +599,145 @@ impl StreamInstant { } } - /// Returns the instant in time after the given duration has passed. - /// - /// Returns `None` if the resulting instant would exceed the bounds of the underlying data - /// structure. - pub fn add(&self, duration: Duration) -> Option { + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn saturating_duration_since(&self, earlier: StreamInstant) -> Duration { + self.checked_duration_since(earlier).unwrap_or_default() + } + + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn duration_since(&self, earlier: StreamInstant) -> Duration { + self.saturating_duration_since(earlier) + } + + /// Returns `Some(t)` where `t` is `self + duration`, or `None` if the result cannot be + /// represented as a `StreamInstant`. + pub fn checked_add(&self, duration: Duration) -> Option { self.as_nanos() - .checked_add(duration.as_nanos() as i128) - .and_then(Self::from_nanos_i128) + .checked_add(duration.as_nanos()) + .and_then(|n| u64::try_from(n).ok()) + .map(Self::from_nanos) } - /// Returns the instant in time one `duration` ago. - /// - /// Returns `None` if the resulting instant would underflow. As a result, it is important to - /// consider that on some platforms the [`StreamInstant`] may begin at `0` from the moment the - /// source stream is created. - pub fn sub(&self, duration: Duration) -> Option { + /// Returns `Some(t)` where `t` is `self - duration`, or `None` if the result cannot be + /// represented as a `StreamInstant` (i.e. would be negative). + pub fn checked_sub(&self, duration: Duration) -> Option { self.as_nanos() - .checked_sub(duration.as_nanos() as i128) - .and_then(Self::from_nanos_i128) + .checked_sub(duration.as_nanos()) + .and_then(|n| u64::try_from(n).ok()) + .map(Self::from_nanos) } - fn as_nanos(&self) -> i128 { - (self.secs as i128 * 1_000_000_000) + self.nanos as i128 + /// Returns the total number of nanoseconds contained by this `StreamInstant`. + pub fn as_nanos(&self) -> u128 { + self.secs as u128 * 1_000_000_000 + self.nanos as u128 } - #[allow(dead_code)] - fn from_nanos(nanos: i64) -> Self { + /// Creates a new `StreamInstant` from the specified number of nanoseconds. + /// + /// Note: Using this on the return value of `as_nanos()` might cause unexpected behavior: + /// `as_nanos()` returns a `u128`, and can return values that do not fit in `u64`, e.g. 585 + /// years. Instead, consider using the pattern + /// `StreamInstant::new(t.as_secs(), t.subsec_nanos())` if you cannot copy/clone the + /// `StreamInstant` directly. + pub fn from_nanos(nanos: u64) -> Self { let secs = nanos / 1_000_000_000; - let subsec_nanos = nanos - secs * 1_000_000_000; - Self::new(secs, subsec_nanos as u32) + let subsec_nanos = (nanos % 1_000_000_000) as u32; + Self::new(secs, subsec_nanos) } - #[allow(dead_code)] - fn from_nanos_i128(nanos: i128) -> Option { - let secs = nanos / 1_000_000_000; - if secs > i64::MAX as i128 || secs < i64::MIN as i128 { - None - } else { - let subsec_nanos = nanos - secs * 1_000_000_000; - debug_assert!(subsec_nanos < u32::MAX as i128); - Some(Self::new(secs as i64, subsec_nanos as u32)) + /// Creates a new `StreamInstant` from the specified number of seconds represented as `f64`. + /// + /// # Panics + /// + /// Panics if `secs` is negative, not finite, or overflows the range of `StreamInstant`. + pub fn from_secs_f64(secs: f64) -> Self { + const NANOS_PER_SEC: u128 = 1_000_000_000; + const MAX_NANOS: f64 = ((u64::MAX as u128 + 1) * NANOS_PER_SEC) as f64; + let nanos = secs * NANOS_PER_SEC as f64; + if !nanos.is_finite() || nanos < 0.0 || nanos >= MAX_NANOS { + panic!("StreamInstant::from_secs_f64 called with invalid value: {secs}"); + } + let nanos = nanos as u128; + Self::new( + (nanos / NANOS_PER_SEC) as u64, + (nanos % NANOS_PER_SEC) as u32, + ) + } + + /// Creates a new `StreamInstant` from the specified number of whole seconds and additional + /// nanoseconds. + /// + /// If `nanos` is greater than or equal to 1 billion (the number of nanoseconds in a second), + /// the excess carries over into `secs`. + /// + /// # Panics + /// + /// Panics if the carry from `nanos` overflows the seconds counter. + pub fn new(secs: u64, nanos: u32) -> Self { + let carry = nanos / 1_000_000_000; + let subsec_nanos = nanos % 1_000_000_000; + let secs = secs + .checked_add(carry as u64) + .expect("overflow in StreamInstant::new"); + StreamInstant { + secs, + nanos: subsec_nanos, } } +} + +impl std::ops::Add for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result overflows the range of `StreamInstant`. Use + /// [`checked_add`][StreamInstant::checked_add] for a non-panicking variant. + #[inline] + fn add(self, rhs: Duration) -> StreamInstant { + self.checked_add(rhs) + .expect("overflow when adding duration to stream instant") + } +} + +impl std::ops::AddAssign for StreamInstant { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl std::ops::Sub for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result underflows the range of `StreamInstant`. Use + /// [`checked_sub`][StreamInstant::checked_sub] for a non-panicking variant. + #[inline] + fn sub(self, rhs: Duration) -> StreamInstant { + self.checked_sub(rhs) + .expect("overflow when subtracting duration from stream instant") + } +} - #[allow(dead_code)] - fn from_secs_f64(secs: f64) -> crate::StreamInstant { - let s = secs.floor() as i64; - let ns = ((secs - s as f64) * 1_000_000_000.0) as u32; - Self::new(s, ns) +impl std::ops::SubAssign for StreamInstant { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; } +} + +impl std::ops::Sub for StreamInstant { + type Output = Duration; - pub fn new(secs: i64, nanos: u32) -> Self { - StreamInstant { secs, nanos } + /// Returns the duration from `rhs` to `self`, saturating to [`Duration::ZERO`] if `rhs` is + /// later than `self`. + #[inline] + fn sub(self, rhs: StreamInstant) -> Duration { + self.saturating_duration_since(rhs) } } @@ -1008,34 +1108,83 @@ pub(crate) const COMMON_SAMPLE_RATES: &[SampleRate] = &[ #[test] fn test_stream_instant() { + let z = StreamInstant::new(0, 0); // origin let a = StreamInstant::new(2, 0); - let b = StreamInstant::new(-2, 0); - let min = StreamInstant::new(i64::MIN, 0); - let max = StreamInstant::new(i64::MAX, 0); + let max = StreamInstant::from_nanos(u64::MAX); // largest representable instant + assert_eq!( - a.sub(Duration::from_secs(1)), + a.checked_sub(Duration::from_secs(1)), Some(StreamInstant::new(1, 0)) ); assert_eq!( - a.sub(Duration::from_secs(2)), + a.checked_sub(Duration::from_secs(2)), Some(StreamInstant::new(0, 0)) ); + assert_eq!(a.checked_sub(Duration::from_secs(3)), None); // would go below zero + assert_eq!(z.checked_sub(Duration::from_nanos(1)), None); // underflow at origin + assert_eq!( - a.sub(Duration::from_secs(3)), - Some(StreamInstant::new(-1, 0)) + a.checked_add(Duration::from_secs(1)), + Some(StreamInstant::new(3, 0)) ); - assert_eq!(min.sub(Duration::from_secs(1)), None); + assert_eq!(max.checked_add(Duration::from_nanos(1)), None); // overflow + + assert_eq!(a.duration_since(z), Duration::from_secs(2)); + assert_eq!(z.duration_since(a), Duration::ZERO); // saturates + assert_eq!(a.checked_duration_since(z), Some(Duration::from_secs(2))); + assert_eq!(z.checked_duration_since(a), None); + assert_eq!(a.saturating_duration_since(z), Duration::from_secs(2)); + assert_eq!(z.saturating_duration_since(a), Duration::ZERO); + + assert_eq!(z + Duration::from_secs(2), a); + assert_eq!(a - Duration::from_secs(2), z); + assert_eq!(a - z, Duration::from_secs(2)); + assert_eq!(z - a, Duration::ZERO); // saturates via Sub + let mut c = z; + c += Duration::from_secs(2); + assert_eq!(c, a); + let mut d = a; + d -= Duration::from_secs(2); + assert_eq!(d, z); + + // nanosecond carry assert_eq!( - b.add(Duration::from_secs(1)), - Some(StreamInstant::new(-1, 0)) + StreamInstant::new(1, 1_500_000_000), + StreamInstant::new(2, 500_000_000) ); assert_eq!( - b.add(Duration::from_secs(2)), - Some(StreamInstant::new(0, 0)) + StreamInstant::new(0, 1_000_000_000), + StreamInstant::new(1, 0) ); + + // basic round-trip assert_eq!( - b.add(Duration::from_secs(3)), - Some(StreamInstant::new(1, 0)) + StreamInstant::from_secs_f64(1.5), + StreamInstant::new(1, 500_000_000) ); - assert_eq!(max.add(Duration::from_secs(1)), None); + assert_eq!(StreamInstant::from_secs_f64(0.0), z); +} + +#[test] +#[should_panic] +fn test_stream_instant_new_overflow() { + StreamInstant::new(u64::MAX, 1_000_000_000); // carry overflows u64 +} + +#[test] +#[should_panic] +fn test_stream_instant_from_secs_f64_negative() { + StreamInstant::from_secs_f64(-1.0); +} + +#[test] +#[should_panic] +fn test_stream_instant_from_secs_f64_nan() { + StreamInstant::from_secs_f64(f64::NAN); +} + +#[test] +#[should_panic] +fn test_stream_instant_from_secs_f64_infinite() { + StreamInstant::from_secs_f64(f64::INFINITY); } diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 116d1b1d0..0a2d953e4 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -614,6 +614,17 @@ macro_rules! impl_platform_host { )* } } + + fn now(&self) -> crate::StreamInstant { + match self.0 { + $( + $(#[cfg($feat)])? + StreamInner::$HostVariant(ref s) => { + s.now() + } + )* + } + } } impl From for Device { diff --git a/src/traits.rs b/src/traits.rs index 0b0ee0e8c..9314270df 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -11,7 +11,8 @@ use crate::{ BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, PauseStreamError, PlayStreamError, SampleFormat, SizedSample, StreamConfig, - StreamError, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, + StreamError, StreamInstant, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, }; /// A [`Host`] provides access to the available audio devices on the system. @@ -321,6 +322,17 @@ pub trait StreamTrait { fn buffer_size(&self) -> Option { None } + + /// Returns a [`StreamInstant`] representing the current moment on the stream's clock. + /// + /// The clock is **monotonic**: successive calls to `now()` will never return a value earlier + /// than a previous one, and the returned value will never be earlier than any `callback`, + /// `capture`, or `playback` instant already delivered to the stream's data callback. + /// + /// The returned value shares the same time base as the [`StreamInstant`]s delivered to the + /// stream's data callback via [`crate::InputStreamTimestamp::callback`] and + /// [`crate::OutputStreamTimestamp::callback`], so durations between them are meaningful. + fn now(&self) -> StreamInstant; } /// Compile-time assertion that a stream type implements [`Send`]. From af7dafeda26c497b590c8b0267c4afe615068dc4 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 30 Mar 2026 21:38:19 +0200 Subject: [PATCH 02/17] refactor: split off timestamps into separate module --- src/lib.rs | 341 +--------------------------------------------- src/timestamp.rs | 348 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 353 insertions(+), 336 deletions(-) create mode 100644 src/timestamp.rs diff --git a/src/lib.rs b/src/lib.rs index a89142996..8cd710d4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,12 +188,11 @@ pub use platform::{ SupportedInputConfigs, SupportedOutputConfigs, ALL_HOSTS, }; pub use samples_formats::{FromSample, Sample, SampleFormat, SizedSample, I24, U24}; -use std::convert::TryInto; -use std::time::Duration; pub mod device_description; mod error; mod host; +mod timestamp; pub mod platform; mod samples_formats; pub mod traits; @@ -469,81 +468,10 @@ pub struct Data { sample_format: SampleFormat, } -/// A monotonic time instance associated with a stream, retrieved from either: -/// -/// 1. A timestamp provided to the stream's underlying audio data callback or -/// 2. The same time source used to generate timestamps for a stream's underlying audio data -/// callback. -/// -/// `StreamInstant` represents a moment on a stream's monotonic clock. Because the underlying clock -/// is monotonic, `StreamInstant` values are always positive and increasing. -/// -/// Within a single stream, all instants share the same clock, so arithmetic between them is -/// meaningful. Across different streams, origins are not guaranteed to be shared. On some hosts -/// each stream starts its own independent clock at zero, so subtracting a timestamp from one -/// stream and one from another may produce a meaningless result. -/// -/// ## Time sources by host -/// -/// | Host | Time source | -/// | ---- | ----------- | -/// | AAudio | `AAudioStream_getTimestamp(CLOCK_MONOTONIC)` | -/// | ALSA | `snd_pcm_status_get_htstamp()` | -/// | ASIO | `timeGetTime()` | -/// | AudioWorklet | `AudioContext.currentTime` | -/// | CoreAudio | `mach_absolute_time()` | -/// | Emscripten | `AudioContext.currentTime` | -/// | JACK | `jack_get_time()` | -/// | PipeWire | `pw_stream_get_time_n()` | -/// | PulseAudio | `std::time::Instant` | -/// | WASAPI | `QueryPerformanceCounter()` | -/// | WebAudio | `AudioContext.currentTime` | -/// -/// > **Disclaimer:** These system calls might change over time. -/// -/// > **Note:** Mathematical operations like [`add`][StreamInstant::add] may panic if the result -/// > cannot be represented as a `StreamInstant`. Use [`checked_add`][StreamInstant::checked_add] -/// > or [`checked_sub`][StreamInstant::checked_sub] for non-panicking variants. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub struct StreamInstant { - secs: u64, - nanos: u32, -} - -/// A timestamp associated with a call to an input stream's data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct InputStreamTimestamp { - /// The instant the stream's data callback was invoked. - pub callback: StreamInstant, - /// The instant that data was captured from the device. - /// - /// E.g. The instant data was read from an ADC. - pub capture: StreamInstant, -} - -/// A timestamp associated with a call to an output stream's data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct OutputStreamTimestamp { - /// The instant the stream's data callback was invoked. - pub callback: StreamInstant, - /// The predicted instant that data written will be delivered to the device for playback. - /// - /// E.g. The instant data will be played by a DAC. - pub playback: StreamInstant, -} - -/// Information relevant to a single call to the user's input stream data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct InputCallbackInfo { - timestamp: InputStreamTimestamp, -} - -/// Information relevant to a single call to the user's output stream data callback. -#[cfg_attr(target_os = "emscripten", wasm_bindgen)] -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct OutputCallbackInfo { - timestamp: OutputStreamTimestamp, -} +pub use timestamp::{ + InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, + StreamInstant, +}; impl SupportedStreamConfig { pub fn new( @@ -585,183 +513,6 @@ impl SupportedStreamConfig { } } -impl StreamInstant { - /// Returns the amount of time elapsed from `earlier` to `self`, or `None` if `earlier` is - /// later than `self`. - pub fn checked_duration_since(&self, earlier: StreamInstant) -> Option { - if self < &earlier { - None - } else { - (self.as_nanos() - earlier.as_nanos()) - .try_into() - .ok() - .map(Duration::from_nanos) - } - } - - /// Returns the amount of time elapsed from `earlier` to `self`, saturating to - /// [`Duration::ZERO`] if `earlier` is later than `self`. - pub fn saturating_duration_since(&self, earlier: StreamInstant) -> Duration { - self.checked_duration_since(earlier).unwrap_or_default() - } - - /// Returns the amount of time elapsed from `earlier` to `self`, saturating to - /// [`Duration::ZERO`] if `earlier` is later than `self`. - pub fn duration_since(&self, earlier: StreamInstant) -> Duration { - self.saturating_duration_since(earlier) - } - - /// Returns `Some(t)` where `t` is `self + duration`, or `None` if the result cannot be - /// represented as a `StreamInstant`. - pub fn checked_add(&self, duration: Duration) -> Option { - self.as_nanos() - .checked_add(duration.as_nanos()) - .and_then(|n| u64::try_from(n).ok()) - .map(Self::from_nanos) - } - - /// Returns `Some(t)` where `t` is `self - duration`, or `None` if the result cannot be - /// represented as a `StreamInstant` (i.e. would be negative). - pub fn checked_sub(&self, duration: Duration) -> Option { - self.as_nanos() - .checked_sub(duration.as_nanos()) - .and_then(|n| u64::try_from(n).ok()) - .map(Self::from_nanos) - } - - /// Returns the total number of nanoseconds contained by this `StreamInstant`. - pub fn as_nanos(&self) -> u128 { - self.secs as u128 * 1_000_000_000 + self.nanos as u128 - } - - /// Creates a new `StreamInstant` from the specified number of nanoseconds. - /// - /// Note: Using this on the return value of `as_nanos()` might cause unexpected behavior: - /// `as_nanos()` returns a `u128`, and can return values that do not fit in `u64`, e.g. 585 - /// years. Instead, consider using the pattern - /// `StreamInstant::new(t.as_secs(), t.subsec_nanos())` if you cannot copy/clone the - /// `StreamInstant` directly. - pub fn from_nanos(nanos: u64) -> Self { - let secs = nanos / 1_000_000_000; - let subsec_nanos = (nanos % 1_000_000_000) as u32; - Self::new(secs, subsec_nanos) - } - - /// Creates a new `StreamInstant` from the specified number of seconds represented as `f64`. - /// - /// # Panics - /// - /// Panics if `secs` is negative, not finite, or overflows the range of `StreamInstant`. - pub fn from_secs_f64(secs: f64) -> Self { - const NANOS_PER_SEC: u128 = 1_000_000_000; - const MAX_NANOS: f64 = ((u64::MAX as u128 + 1) * NANOS_PER_SEC) as f64; - let nanos = secs * NANOS_PER_SEC as f64; - if !nanos.is_finite() || nanos < 0.0 || nanos >= MAX_NANOS { - panic!("StreamInstant::from_secs_f64 called with invalid value: {secs}"); - } - let nanos = nanos as u128; - Self::new( - (nanos / NANOS_PER_SEC) as u64, - (nanos % NANOS_PER_SEC) as u32, - ) - } - - /// Creates a new `StreamInstant` from the specified number of whole seconds and additional - /// nanoseconds. - /// - /// If `nanos` is greater than or equal to 1 billion (the number of nanoseconds in a second), - /// the excess carries over into `secs`. - /// - /// # Panics - /// - /// Panics if the carry from `nanos` overflows the seconds counter. - pub fn new(secs: u64, nanos: u32) -> Self { - let carry = nanos / 1_000_000_000; - let subsec_nanos = nanos % 1_000_000_000; - let secs = secs - .checked_add(carry as u64) - .expect("overflow in StreamInstant::new"); - StreamInstant { - secs, - nanos: subsec_nanos, - } - } -} - -impl std::ops::Add for StreamInstant { - type Output = StreamInstant; - - /// # Panics - /// - /// Panics if the result overflows the range of `StreamInstant`. Use - /// [`checked_add`][StreamInstant::checked_add] for a non-panicking variant. - #[inline] - fn add(self, rhs: Duration) -> StreamInstant { - self.checked_add(rhs) - .expect("overflow when adding duration to stream instant") - } -} - -impl std::ops::AddAssign for StreamInstant { - #[inline] - fn add_assign(&mut self, rhs: Duration) { - *self = *self + rhs; - } -} - -impl std::ops::Sub for StreamInstant { - type Output = StreamInstant; - - /// # Panics - /// - /// Panics if the result underflows the range of `StreamInstant`. Use - /// [`checked_sub`][StreamInstant::checked_sub] for a non-panicking variant. - #[inline] - fn sub(self, rhs: Duration) -> StreamInstant { - self.checked_sub(rhs) - .expect("overflow when subtracting duration from stream instant") - } -} - -impl std::ops::SubAssign for StreamInstant { - #[inline] - fn sub_assign(&mut self, rhs: Duration) { - *self = *self - rhs; - } -} - -impl std::ops::Sub for StreamInstant { - type Output = Duration; - - /// Returns the duration from `rhs` to `self`, saturating to [`Duration::ZERO`] if `rhs` is - /// later than `self`. - #[inline] - fn sub(self, rhs: StreamInstant) -> Duration { - self.saturating_duration_since(rhs) - } -} - -impl InputCallbackInfo { - pub fn new(timestamp: InputStreamTimestamp) -> Self { - Self { timestamp } - } - - /// The timestamp associated with the call to an input stream's data callback. - pub fn timestamp(&self) -> InputStreamTimestamp { - self.timestamp - } -} - -impl OutputCallbackInfo { - pub fn new(timestamp: OutputStreamTimestamp) -> Self { - Self { timestamp } - } - - /// The timestamp associated with the call to an output stream's data callback. - pub fn timestamp(&self) -> OutputStreamTimestamp { - self.timestamp - } -} // Note: Data does not implement `is_empty()` because it always contains a valid audio buffer // by design. The buffer may contain silence, but it is never structurally empty. @@ -1106,85 +857,3 @@ pub(crate) const COMMON_SAMPLE_RATES: &[SampleRate] = &[ 176400, 192000, 352800, 384000, 705600, 768000, 1411200, 1536000, ]; -#[test] -fn test_stream_instant() { - let z = StreamInstant::new(0, 0); // origin - let a = StreamInstant::new(2, 0); - let max = StreamInstant::from_nanos(u64::MAX); // largest representable instant - - assert_eq!( - a.checked_sub(Duration::from_secs(1)), - Some(StreamInstant::new(1, 0)) - ); - assert_eq!( - a.checked_sub(Duration::from_secs(2)), - Some(StreamInstant::new(0, 0)) - ); - assert_eq!(a.checked_sub(Duration::from_secs(3)), None); // would go below zero - assert_eq!(z.checked_sub(Duration::from_nanos(1)), None); // underflow at origin - - assert_eq!( - a.checked_add(Duration::from_secs(1)), - Some(StreamInstant::new(3, 0)) - ); - assert_eq!(max.checked_add(Duration::from_nanos(1)), None); // overflow - - assert_eq!(a.duration_since(z), Duration::from_secs(2)); - assert_eq!(z.duration_since(a), Duration::ZERO); // saturates - assert_eq!(a.checked_duration_since(z), Some(Duration::from_secs(2))); - assert_eq!(z.checked_duration_since(a), None); - assert_eq!(a.saturating_duration_since(z), Duration::from_secs(2)); - assert_eq!(z.saturating_duration_since(a), Duration::ZERO); - - assert_eq!(z + Duration::from_secs(2), a); - assert_eq!(a - Duration::from_secs(2), z); - assert_eq!(a - z, Duration::from_secs(2)); - assert_eq!(z - a, Duration::ZERO); // saturates via Sub - let mut c = z; - c += Duration::from_secs(2); - assert_eq!(c, a); - let mut d = a; - d -= Duration::from_secs(2); - assert_eq!(d, z); - - // nanosecond carry - assert_eq!( - StreamInstant::new(1, 1_500_000_000), - StreamInstant::new(2, 500_000_000) - ); - assert_eq!( - StreamInstant::new(0, 1_000_000_000), - StreamInstant::new(1, 0) - ); - - // basic round-trip - assert_eq!( - StreamInstant::from_secs_f64(1.5), - StreamInstant::new(1, 500_000_000) - ); - assert_eq!(StreamInstant::from_secs_f64(0.0), z); -} - -#[test] -#[should_panic] -fn test_stream_instant_new_overflow() { - StreamInstant::new(u64::MAX, 1_000_000_000); // carry overflows u64 -} - -#[test] -#[should_panic] -fn test_stream_instant_from_secs_f64_negative() { - StreamInstant::from_secs_f64(-1.0); -} - -#[test] -#[should_panic] -fn test_stream_instant_from_secs_f64_nan() { - StreamInstant::from_secs_f64(f64::NAN); -} - -#[test] -#[should_panic] -fn test_stream_instant_from_secs_f64_infinite() { - StreamInstant::from_secs_f64(f64::INFINITY); -} diff --git a/src/timestamp.rs b/src/timestamp.rs new file mode 100644 index 000000000..d948035b8 --- /dev/null +++ b/src/timestamp.rs @@ -0,0 +1,348 @@ +use std::time::Duration; + +#[cfg(all( + target_arch = "wasm32", + any(target_os = "emscripten", feature = "wasm-bindgen") +))] +use wasm_bindgen::prelude::*; + +/// A monotonic time instance associated with a stream, retrieved from either: +/// +/// 1. A timestamp provided to the stream's underlying audio data callback or +/// 2. The same time source used to generate timestamps for a stream's underlying audio data +/// callback. +/// +/// `StreamInstant` represents a moment on a stream's monotonic clock. Because the underlying clock +/// is monotonic, `StreamInstant` values are always positive and increasing. +/// +/// Within a single stream, all instants share the same clock, so arithmetic between them is +/// meaningful. Across different streams, origins are not guaranteed to be shared. On some hosts +/// each stream starts its own independent clock at zero, so subtracting a timestamp from one +/// stream and one from another may produce a meaningless result. +/// +/// ## Time sources by host +/// +/// | Host | Time source | +/// | ---- | ----------- | +/// | AAudio | `AAudioStream_getTimestamp(CLOCK_MONOTONIC)` | +/// | ALSA | `snd_pcm_status_get_htstamp()` | +/// | ASIO | `timeGetTime()` | +/// | AudioWorklet | `AudioContext.currentTime` | +/// | CoreAudio | `mach_absolute_time()` | +/// | Emscripten | `AudioContext.currentTime` | +/// | JACK | `jack_get_time()` | +/// | PipeWire | `pw_stream_get_time_n()` | +/// | PulseAudio | `std::time::Instant` | +/// | WASAPI | `QueryPerformanceCounter()` | +/// | WebAudio | `AudioContext.currentTime` | +/// +/// > **Disclaimer:** These system calls might change over time. +/// +/// > **Note:** The `+` and `-` operators on `StreamInstant` may panic if the result cannot be +/// > represented as a `StreamInstant`. Use [`checked_add`][StreamInstant::checked_add] or +/// > [`checked_sub`][StreamInstant::checked_sub] for non-panicking variants. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub struct StreamInstant { + secs: u64, + nanos: u32, +} + +/// A timestamp associated with a call to an input stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct InputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The instant that data was captured from the device. + /// + /// E.g. The instant data was read from an ADC. + pub capture: StreamInstant, +} + +/// A timestamp associated with a call to an output stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The predicted instant that data written will be delivered to the device for playback. + /// + /// E.g. The instant data will be played by a DAC. + pub playback: StreamInstant, +} + +/// Information relevant to a single call to the user's input stream data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct InputCallbackInfo { + pub(crate) timestamp: InputStreamTimestamp, +} + +/// Information relevant to a single call to the user's output stream data callback. +#[cfg_attr(target_os = "emscripten", wasm_bindgen)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputCallbackInfo { + pub(crate) timestamp: OutputStreamTimestamp, +} + +impl StreamInstant { + /// Returns the amount of time elapsed from `earlier` to `self`, or `None` if `earlier` is + /// later than `self`. + pub fn checked_duration_since(&self, earlier: StreamInstant) -> Option { + if self < &earlier { + return None; + } + let delta = self.as_nanos() - earlier.as_nanos(); + let secs = u64::try_from(delta / 1_000_000_000).ok()?; + let subsec_nanos = (delta % 1_000_000_000) as u32; + Some(Duration::new(secs, subsec_nanos)) + } + + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn saturating_duration_since(&self, earlier: StreamInstant) -> Duration { + self.checked_duration_since(earlier).unwrap_or_default() + } + + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn duration_since(&self, earlier: StreamInstant) -> Duration { + self.saturating_duration_since(earlier) + } + + /// Returns `Some(t)` where `t` is `self + duration`, or `None` if the result cannot be + /// represented as a `StreamInstant`. + pub fn checked_add(&self, duration: Duration) -> Option { + let total = self.as_nanos().checked_add(duration.as_nanos())?; + let secs = u64::try_from(total / 1_000_000_000).ok()?; + let nanos = (total % 1_000_000_000) as u32; + Some(StreamInstant { secs, nanos }) + } + + /// Returns `Some(t)` where `t` is `self - duration`, or `None` if the result cannot be + /// represented as a `StreamInstant` (i.e. would be negative). + pub fn checked_sub(&self, duration: Duration) -> Option { + let total = self.as_nanos().checked_sub(duration.as_nanos())?; + let secs = u64::try_from(total / 1_000_000_000).ok()?; + let nanos = (total % 1_000_000_000) as u32; + Some(StreamInstant { secs, nanos }) + } + + /// Returns the total number of nanoseconds contained by this `StreamInstant`. + pub fn as_nanos(&self) -> u128 { + self.secs as u128 * 1_000_000_000 + self.nanos as u128 + } + + /// Creates a new `StreamInstant` from the specified number of nanoseconds. + /// + /// Note: Using this on the return value of `as_nanos()` might cause unexpected behavior: + /// `as_nanos()` returns a `u128`, and can return values that do not fit in `u64`, e.g. 585 + /// years. Instead, consider using the pattern + /// `StreamInstant::new(t.as_secs(), t.subsec_nanos())` if you cannot copy/clone the + /// `StreamInstant` directly. + pub fn from_nanos(nanos: u64) -> Self { + let secs = nanos / 1_000_000_000; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + Self::new(secs, subsec_nanos) + } + + /// Creates a new `StreamInstant` from the specified number of seconds represented as `f64`. + /// + /// # Panics + /// + /// Panics if `secs` is negative, not finite, or overflows the range of `StreamInstant`. + pub fn from_secs_f64(secs: f64) -> Self { + const NANOS_PER_SEC: u128 = 1_000_000_000; + const MAX_NANOS: f64 = ((u64::MAX as u128 + 1) * NANOS_PER_SEC) as f64; + let nanos = secs * NANOS_PER_SEC as f64; + if !(0.0..MAX_NANOS).contains(&nanos) { + panic!("StreamInstant::from_secs_f64 called with invalid value: {secs}"); + } + let nanos = nanos as u128; + Self::new( + (nanos / NANOS_PER_SEC) as u64, + (nanos % NANOS_PER_SEC) as u32, + ) + } + + /// Creates a new `StreamInstant` from the specified number of whole seconds and additional + /// nanoseconds. + /// + /// If `nanos` is greater than or equal to 1 billion (the number of nanoseconds in a second), + /// the excess carries over into `secs`. + /// + /// # Panics + /// + /// Panics if the carry from `nanos` overflows the seconds counter. + pub fn new(secs: u64, nanos: u32) -> Self { + let carry = nanos / 1_000_000_000; + let subsec_nanos = nanos % 1_000_000_000; + let secs = secs + .checked_add(carry as u64) + .expect("overflow in StreamInstant::new"); + StreamInstant { + secs, + nanos: subsec_nanos, + } + } +} + +impl std::ops::Add for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result overflows the range of `StreamInstant`. Use + /// [`checked_add`][StreamInstant::checked_add] for a non-panicking variant. + #[inline] + fn add(self, rhs: Duration) -> StreamInstant { + self.checked_add(rhs) + .expect("overflow when adding duration to stream instant") + } +} + +impl std::ops::AddAssign for StreamInstant { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl std::ops::Sub for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result underflows the range of `StreamInstant`. Use + /// [`checked_sub`][StreamInstant::checked_sub] for a non-panicking variant. + #[inline] + fn sub(self, rhs: Duration) -> StreamInstant { + self.checked_sub(rhs) + .expect("overflow when subtracting duration from stream instant") + } +} + +impl std::ops::SubAssign for StreamInstant { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl std::ops::Sub for StreamInstant { + type Output = Duration; + + /// Returns the duration from `rhs` to `self`, saturating to [`Duration::ZERO`] if `rhs` is + /// later than `self`. + #[inline] + fn sub(self, rhs: StreamInstant) -> Duration { + self.saturating_duration_since(rhs) + } +} + +impl InputCallbackInfo { + pub fn new(timestamp: InputStreamTimestamp) -> Self { + Self { timestamp } + } + + /// The timestamp associated with the call to an input stream's data callback. + pub fn timestamp(&self) -> InputStreamTimestamp { + self.timestamp + } +} + +impl OutputCallbackInfo { + pub fn new(timestamp: OutputStreamTimestamp) -> Self { + Self { timestamp } + } + + /// The timestamp associated with the call to an output stream's data callback. + pub fn timestamp(&self) -> OutputStreamTimestamp { + self.timestamp + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stream_instant() { + let z = StreamInstant::new(0, 0); // origin + let a = StreamInstant::new(2, 0); + let max = StreamInstant::new(u64::MAX, 999_999_999); // largest representable instant + + assert_eq!( + a.checked_sub(Duration::from_secs(1)), + Some(StreamInstant::new(1, 0)) + ); + assert_eq!( + a.checked_sub(Duration::from_secs(2)), + Some(StreamInstant::new(0, 0)) + ); + assert_eq!(a.checked_sub(Duration::from_secs(3)), None); // would go below zero + assert_eq!(z.checked_sub(Duration::from_nanos(1)), None); // underflow at origin + + assert_eq!( + a.checked_add(Duration::from_secs(1)), + Some(StreamInstant::new(3, 0)) + ); + assert_eq!(max.checked_add(Duration::from_nanos(1)), None); // overflow + + assert_eq!(a.duration_since(z), Duration::from_secs(2)); + assert_eq!(z.duration_since(a), Duration::ZERO); // saturates + assert_eq!(a.checked_duration_since(z), Some(Duration::from_secs(2))); + assert_eq!(z.checked_duration_since(a), None); + assert_eq!(a.saturating_duration_since(z), Duration::from_secs(2)); + assert_eq!(z.saturating_duration_since(a), Duration::ZERO); + + assert_eq!(z + Duration::from_secs(2), a); + assert_eq!(a - Duration::from_secs(2), z); + assert_eq!(a - z, Duration::from_secs(2)); + assert_eq!(z - a, Duration::ZERO); // saturates via Sub + let mut c = z; + c += Duration::from_secs(2); + assert_eq!(c, a); + let mut d = a; + d -= Duration::from_secs(2); + assert_eq!(d, z); + + // nanosecond carry + assert_eq!( + StreamInstant::new(1, 1_500_000_000), + StreamInstant::new(2, 500_000_000) + ); + assert_eq!( + StreamInstant::new(0, 1_000_000_000), + StreamInstant::new(1, 0) + ); + + // basic round-trip + assert_eq!( + StreamInstant::from_secs_f64(1.5), + StreamInstant::new(1, 500_000_000) + ); + assert_eq!(StreamInstant::from_secs_f64(0.0), z); + } + + #[test] + #[should_panic] + fn test_stream_instant_new_overflow() { + StreamInstant::new(u64::MAX, 1_000_000_000); // carry overflows u64 + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_negative() { + StreamInstant::from_secs_f64(-1.0); + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_nan() { + StreamInstant::from_secs_f64(f64::NAN); + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_infinite() { + StreamInstant::from_secs_f64(f64::INFINITY); + } +} From 8758b158c2f2677f6a451f3fb9ba9acabb3b2e70 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 30 Mar 2026 21:39:16 +0200 Subject: [PATCH 03/17] refactor: address review comments --- UPGRADING.md | 1 - examples/custom.rs | 2 +- src/host/aaudio/convert.rs | 3 ++- src/host/alsa/mod.rs | 2 +- src/host/asio/mod.rs | 2 +- src/host/coreaudio/mod.rs | 6 +++++- src/host/jack/stream.rs | 2 +- src/host/null/mod.rs | 4 ++++ src/host/pipewire/stream.rs | 2 -- src/host/wasapi/stream.rs | 24 +++++++++++++++++++----- 10 files changed, 34 insertions(+), 14 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index ccd1c3b0e..1ca193f2e 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -13,7 +13,6 @@ This guide covers breaking changes requiring code updates. See [CHANGELOG.md](CH - [ ] Change `instant.sub(d)` to `instant.checked_sub(d)` (or use `instant - d`). - [ ] Update `StreamInstant::new(secs, nanos)` call sites: `secs` is now `u64`. - [ ] Update `StreamInstant::from_nanos(nanos)` call sites: `nanos` is now `u64`. -- [ ] Remove any uses of `StreamInstant::as_nanos()` — see migration note below. - [ ] Update `duration_since` call sites to pass by value (drop the `&`). ## 1. Error enums are now `#[non_exhaustive]` diff --git a/examples/custom.rs b/examples/custom.rs index 5817517a3..e1f40d045 100644 --- a/examples/custom.rs +++ b/examples/custom.rs @@ -201,7 +201,7 @@ impl StreamTrait for MyStream { fn now(&self) -> cpal::StreamInstant { let elapsed = self.start.elapsed(); - cpal::StreamInstant::from_nanos(elapsed.as_nanos() as u64) + cpal::StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()) } } diff --git a/src/host/aaudio/convert.rs b/src/host/aaudio/convert.rs index 65669212c..841c6867e 100644 --- a/src/host/aaudio/convert.rs +++ b/src/host/aaudio/convert.rs @@ -13,7 +13,8 @@ pub fn now_stream_instant() -> StreamInstant { tv_sec: 0, tv_nsec: 0, }; - unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + let res = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + assert_eq!(res, 0, "clock_gettime(CLOCK_MONOTONIC) failed"); StreamInstant::from_nanos(ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64) } diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index 5c03fc566..c94203b0f 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1158,7 +1158,7 @@ fn stream_timestamp_fallback( ) -> Result { let now = std::time::Instant::now(); let duration = now.duration_since(creation); - Ok(crate::StreamInstant::from_nanos(duration.as_nanos() as u64)) + Ok(crate::StreamInstant::new(duration.as_secs(), duration.subsec_nanos())) } // Adapted from `timestamp2ns` here: diff --git a/src/host/asio/mod.rs b/src/host/asio/mod.rs index aafdf4225..4ecdb0f52 100644 --- a/src/host/asio/mod.rs +++ b/src/host/asio/mod.rs @@ -161,7 +161,7 @@ impl StreamTrait for Stream { // derived from `timeGetTime()`, so calling it here gives a value on the // same clock as the `system_time` field delivered to every callback. let ms = unsafe { windows::Win32::Media::timeGetTime() }; - crate::StreamInstant::from_nanos(ms as i64 * 1_000_000) + crate::StreamInstant::from_nanos(ms as u64 * 1_000_000) } fn buffer_size(&self) -> Option { diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index 579f24e27..3255a7992 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -81,7 +81,11 @@ fn host_time_to_stream_instant( let res = unsafe { mach2::mach_time::mach_timebase_info(&mut info) }; check_os_status(res)?; let nanos = m_host_time as u128 * info.numer as u128 / info.denom as u128; - Ok(crate::StreamInstant::from_nanos(nanos as u64)) + let secs = u64::try_from(nanos / 1_000_000_000).map_err(|_| BackendSpecificError { + description: "mach absolute time overflow".to_string(), + })?; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + Ok(crate::StreamInstant::new(secs, subsec_nanos)) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index d33a048ed..b9d8ca59e 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -404,7 +404,7 @@ impl jack::ProcessHandler for LocalProcessHandler { } fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { - crate::StreamInstant::from_nanos((micros as u128 * 1_000) as u64) + crate::StreamInstant::new(micros / 1_000_000, ((micros % 1_000_000) * 1_000) as u32) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/null/mod.rs b/src/host/null/mod.rs index a42ae9bd2..ac5cbe57f 100644 --- a/src/host/null/mod.rs +++ b/src/host/null/mod.rs @@ -143,6 +143,10 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), PauseStreamError> { unimplemented!() } + + fn now(&self) -> crate::StreamInstant { + unimplemented!() + } } impl Iterator for Devices { diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 1170708c5..449beea06 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -6,8 +6,6 @@ use std::{ thread::JoinHandle, }; -use libc; - use crate::{ host::fill_with_equilibrium, traits::StreamTrait, BackendSpecificError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, StreamConfig, StreamError, StreamInstant, diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 20c4b2942..5fbea98d9 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -34,6 +34,9 @@ pub struct Stream { // Callback size in frames. period_frames: FrameCount, + + // QueryPerformanceFrequency result, cached at construction (constant for the system lifetime). + qpc_frequency: u64, } // SAFETY: Windows Event HANDLEs are safe to send between threads - they are designed for @@ -125,6 +128,11 @@ impl Stream { let (tx, rx) = channel(); let period_frames = stream_inner.period_frames; + let mut qpc_frequency: i64 = 0; + unsafe { + Performance::QueryPerformanceFrequency(&mut qpc_frequency) + .expect("QueryPerformanceFrequency failed"); + } let run_context = RunContext { handles: vec![pending_scheduled_event, stream_inner.event], @@ -142,6 +150,7 @@ impl Stream { commands: tx, pending_scheduled_event, period_frames, + qpc_frequency: qpc_frequency as u64, } } @@ -161,6 +170,11 @@ impl Stream { let (tx, rx) = channel(); let period_frames = stream_inner.period_frames; + let mut qpc_frequency: i64 = 0; + unsafe { + Performance::QueryPerformanceFrequency(&mut qpc_frequency) + .expect("QueryPerformanceFrequency failed"); + } let run_context = RunContext { handles: vec![pending_scheduled_event, stream_inner.event], @@ -178,6 +192,7 @@ impl Stream { commands: tx, pending_scheduled_event, period_frames, + qpc_frequency: qpc_frequency as u64, } } @@ -218,15 +233,14 @@ impl StreamTrait for Stream { fn now(&self) -> crate::StreamInstant { let mut counter: i64 = 0; - let mut freq: i64 = 0; unsafe { Performance::QueryPerformanceCounter(&mut counter) .expect("QueryPerformanceCounter failed"); - Performance::QueryPerformanceFrequency(&mut freq) - .expect("QueryPerformanceFrequency failed"); } - let nanos = (counter as u128 * 1_000_000_000 / freq as u128) as u64; - crate::StreamInstant::from_nanos(nanos) + let nanos = counter as u128 * 1_000_000_000 / self.qpc_frequency as u128; + let secs = (nanos / 1_000_000_000) as u64; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + crate::StreamInstant::new(secs, subsec_nanos) } fn buffer_size(&self) -> Option { From 7079f93c33487b78a3696e00bee7619d078df53a Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 30 Mar 2026 21:40:16 +0200 Subject: [PATCH 04/17] style: cargo fmt --- src/host/alsa/mod.rs | 5 ++++- src/lib.rs | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index c94203b0f..b686cec11 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1158,7 +1158,10 @@ fn stream_timestamp_fallback( ) -> Result { let now = std::time::Instant::now(); let duration = now.duration_since(creation); - Ok(crate::StreamInstant::new(duration.as_secs(), duration.subsec_nanos())) + Ok(crate::StreamInstant::new( + duration.as_secs(), + duration.subsec_nanos(), + )) } // Adapted from `timestamp2ns` here: diff --git a/src/lib.rs b/src/lib.rs index 8cd710d4e..3b1a572bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,9 +192,9 @@ pub use samples_formats::{FromSample, Sample, SampleFormat, SizedSample, I24, U2 pub mod device_description; mod error; mod host; -mod timestamp; pub mod platform; mod samples_formats; +mod timestamp; pub mod traits; /// Iterator of devices wrapped in a filter to only include certain device types @@ -513,7 +513,6 @@ impl SupportedStreamConfig { } } - // Note: Data does not implement `is_empty()` because it always contains a valid audio buffer // by design. The buffer may contain silence, but it is never structurally empty. #[allow(clippy::len_without_is_empty)] @@ -856,4 +855,3 @@ pub(crate) const COMMON_SAMPLE_RATES: &[SampleRate] = &[ 5512, 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000, 64000, 88200, 96000, 176400, 192000, 352800, 384000, 705600, 768000, 1411200, 1536000, ]; - From a172e77ed22c8cb29b8268b2fb8a5a37a006d7ba Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 30 Mar 2026 21:48:08 +0200 Subject: [PATCH 05/17] refactor: clippy fixes --- src/timestamp.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/timestamp.rs b/src/timestamp.rs index d948035b8..bbd608a5e 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -1,9 +1,6 @@ use std::time::Duration; -#[cfg(all( - target_arch = "wasm32", - any(target_os = "emscripten", feature = "wasm-bindgen") -))] +#[cfg(target_os = "emscripten")] use wasm_bindgen::prelude::*; /// A monotonic time instance associated with a stream, retrieved from either: From 9113a058d9d774b54d95e73dbad12792f395db0c Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 22:16:45 +0200 Subject: [PATCH 06/17] fix(alsa): avoid underflow when computing capture timestamp --- src/host/alsa/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index b686cec11..f3a4d411b 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1064,7 +1064,9 @@ fn process_input( stream_timestamp_fallback(stream.creation_instant) }?; let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); - let capture = callback - delay_duration; + let capture = callback + .checked_sub(delay_duration) + .unwrap_or(crate::StreamInstant::new(0, 0)); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); From 8389bb263999a08cd0ee6385614411665f6d82e1 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 22:20:47 +0200 Subject: [PATCH 07/17] fix(wasapi): avoid truncating large nanosecond values --- src/host/wasapi/stream.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 5fbea98d9..f62d86d48 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -132,6 +132,7 @@ impl Stream { unsafe { Performance::QueryPerformanceFrequency(&mut qpc_frequency) .expect("QueryPerformanceFrequency failed"); + debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero"); } let run_context = RunContext { @@ -174,6 +175,7 @@ impl Stream { unsafe { Performance::QueryPerformanceFrequency(&mut qpc_frequency) .expect("QueryPerformanceFrequency failed"); + debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero"); } let run_context = RunContext { @@ -598,9 +600,12 @@ fn stream_instant(stream: &StreamInner) -> Result)?; }; - // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = (qpc_position as u128 * 100) as u64; - let instant = crate::StreamInstant::from_nanos(qpc_nanos); + // The `qpc_position` is in 100-nanosecond units. + let nanos = qpc_position as u128 * 100; + let instant = crate::StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ); Ok(instant) } @@ -613,9 +618,12 @@ fn input_timestamp( stream: &StreamInner, buffer_qpc_position: u64, ) -> Result { - // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = (buffer_qpc_position as u128 * 100) as u64; - let capture = crate::StreamInstant::from_nanos(qpc_nanos); + // The `qpc_position` is in 100-nanosecond units. + let nanos = buffer_qpc_position as u128 * 100; + let capture = crate::StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ); let callback = stream_instant(stream)?; Ok(crate::InputStreamTimestamp { capture, callback }) } From 9c91f64603ddeec9098137c360fec662a285c3bb Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 23:00:09 +0200 Subject: [PATCH 08/17] feat: add StreamInstant::from_micros and from_millis --- src/host/jack/stream.rs | 2 +- src/timestamp.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index b9d8ca59e..83cdb3457 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -404,7 +404,7 @@ impl jack::ProcessHandler for LocalProcessHandler { } fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { - crate::StreamInstant::new(micros / 1_000_000, ((micros % 1_000_000) * 1_000) as u32) + crate::StreamInstant::from_micros(micros) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/timestamp.rs b/src/timestamp.rs index bbd608a5e..4bbcfc58d 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -140,6 +140,16 @@ impl StreamInstant { Self::new(secs, subsec_nanos) } + /// Creates a new `StreamInstant` from the specified number of milliseconds. + pub fn from_millis(millis: u64) -> Self { + Self::new(millis / 1_000, (millis % 1_000 * 1_000_000) as u32) + } + + /// Creates a new `StreamInstant` from the specified number of microseconds. + pub fn from_micros(micros: u64) -> Self { + Self::new(micros / 1_000_000, (micros % 1_000_000 * 1_000) as u32) + } + /// Creates a new `StreamInstant` from the specified number of seconds represented as `f64`. /// /// # Panics From d9e12d8c04c06d3eba149ff99dbe73cc20e6b602 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 23:39:51 +0200 Subject: [PATCH 09/17] refactor: add StreamInstant::ZERO constant --- src/host/alsa/mod.rs | 2 +- src/timestamp.rs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index f3a4d411b..85ce5dcac 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1066,7 +1066,7 @@ fn process_input( let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); let capture = callback .checked_sub(delay_duration) - .unwrap_or(crate::StreamInstant::new(0, 0)); + .unwrap_or(crate::StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); diff --git a/src/timestamp.rs b/src/timestamp.rs index 4bbcfc58d..783ad1829 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -80,6 +80,9 @@ pub struct OutputCallbackInfo { } impl StreamInstant { + /// A `StreamInstant` with `secs` and `nanos` both set to zero. + pub const ZERO: Self = Self { secs: 0, nanos: 0 }; + /// Returns the amount of time elapsed from `earlier` to `self`, or `None` if `earlier` is /// later than `self`. pub fn checked_duration_since(&self, earlier: StreamInstant) -> Option { @@ -272,7 +275,7 @@ mod tests { #[test] fn test_stream_instant() { - let z = StreamInstant::new(0, 0); // origin + let z = StreamInstant::ZERO; // origin let a = StreamInstant::new(2, 0); let max = StreamInstant::new(u64::MAX, 999_999_999); // largest representable instant @@ -282,7 +285,7 @@ mod tests { ); assert_eq!( a.checked_sub(Duration::from_secs(2)), - Some(StreamInstant::new(0, 0)) + Some(StreamInstant::ZERO) ); assert_eq!(a.checked_sub(Duration::from_secs(3)), None); // would go below zero assert_eq!(z.checked_sub(Duration::from_nanos(1)), None); // underflow at origin From c6da114338bf0209aa6e9e838294e90ebf408f21 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 23:50:12 +0200 Subject: [PATCH 10/17] fix: avoid underflow in timestamps --- src/host/coreaudio/ios/mod.rs | 9 +++++---- src/host/coreaudio/macos/device.rs | 4 +++- src/host/pipewire/stream.rs | 4 +++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index 2ead1160d..5fcd4af33 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -16,8 +16,9 @@ use crate::{ BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, - SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, + PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, }; use self::enumerate::{ @@ -279,7 +280,7 @@ impl StreamTrait for Stream { Ok(()) } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { let m_host_time = unsafe { mach2::mach_time::mach_absolute_time() }; host_time_to_stream_instant(m_host_time).expect("mach_timebase_info failed") } @@ -506,7 +507,7 @@ where } }); let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - delay; + let capture = callback.checked_sub(delay).unwrap_or(StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index f08f47103..7793588ce 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -796,7 +796,9 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - delay; + let capture = callback + .checked_sub(delay) + .unwrap_or(crate::StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 449beea06..ec5d585eb 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -220,7 +220,9 @@ where let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { description: "clock_gettime failed".to_owned(), })?; - let pl = cb - frames_to_duration(frames, self.format.rate()); + let pl = cb + .checked_sub(frames_to_duration(frames, self.format.rate())) + .unwrap_or(crate::StreamInstant::ZERO); (cb, pl) } }; From f320f980bc310de1d30ac9d912dfcc48c9a5ad3b Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 23:54:22 +0200 Subject: [PATCH 11/17] refactor: import StreamInstant into scope --- src/host/alsa/mod.rs | 16 ++++++++-------- src/host/audioworklet/mod.rs | 8 ++++---- src/host/custom/mod.rs | 8 ++++---- src/host/emscripten/mod.rs | 10 +++++----- src/host/jack/stream.rs | 8 ++++---- src/host/wasapi/stream.rs | 12 ++++++------ src/host/webaudio/mod.rs | 10 +++++----- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index 85ce5dcac..3fc234ad3 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -27,8 +27,8 @@ use crate::{ DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection, DeviceId, DeviceIdError, DeviceNameError, DevicesError, FrameCount, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, - StreamError, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, - SupportedStreamConfigsError, + StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, + SupportedStreamConfigRange, SupportedStreamConfigsError, }; mod enumerate; @@ -1066,7 +1066,7 @@ fn process_input( let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); let capture = callback .checked_sub(delay_duration) - .unwrap_or(crate::StreamInstant::ZERO); + .unwrap_or(StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); @@ -1127,7 +1127,7 @@ fn process_output( #[inline] fn stream_timestamp_hardware( status: &alsa::pcm::Status, -) -> Result { +) -> Result { let trigger_ts = status.get_trigger_htstamp(); // trigger_htstamp records when the PCM stream started. // On the first few callbacks, it might not have been set yet, @@ -1148,7 +1148,7 @@ fn stream_timestamp_hardware( ); return Err(BackendSpecificError { description }); } - Ok(crate::StreamInstant::from_nanos(nanos as u64)) + Ok(StreamInstant::from_nanos(nanos as u64)) } // Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. @@ -1157,10 +1157,10 @@ fn stream_timestamp_hardware( #[inline] fn stream_timestamp_fallback( creation: std::time::Instant, -) -> Result { +) -> Result { let now = std::time::Instant::now(); let duration = now.duration_since(creation); - Ok(crate::StreamInstant::new( + Ok(StreamInstant::new( duration.as_secs(), duration.subsec_nanos(), )) @@ -1277,7 +1277,7 @@ impl StreamTrait for Stream { self.inner.channel.pause(true).ok(); Ok(()) } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { if self.inner.use_hw_timestamps { if let Ok(status) = self.inner.channel.status() { if let Ok(instant) = stream_timestamp_hardware(&status) { diff --git a/src/host/audioworklet/mod.rs b/src/host/audioworklet/mod.rs index 46fa6345f..26154a56b 100644 --- a/src/host/audioworklet/mod.rs +++ b/src/host/audioworklet/mod.rs @@ -14,7 +14,7 @@ use crate::{ BackendSpecificError, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, - SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, + SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; @@ -260,7 +260,7 @@ impl DeviceTrait for Device { Data::from_parts(data, interleaved_data.len(), sample_format) }; - let callback = crate::StreamInstant::from_secs_f64(now); + let callback = StreamInstant::from_secs_f64(now); let buffer_duration = frames_to_duration(frame_size as _, sample_rate); let playback = callback + (buffer_duration @@ -322,8 +322,8 @@ impl StreamTrait for Stream { } } - fn now(&self) -> crate::StreamInstant { - crate::StreamInstant::from_secs_f64(self.audio_context.current_time()) + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.audio_context.current_time()) } } diff --git a/src/host/custom/mod.rs b/src/host/custom/mod.rs index 1a2ac4b02..7b7ebbbd1 100644 --- a/src/host/custom/mod.rs +++ b/src/host/custom/mod.rs @@ -7,7 +7,7 @@ use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; use crate::{ BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleFormat, StreamConfig, StreamError, SupportedStreamConfig, + PlayStreamError, SampleFormat, StreamConfig, StreamError, StreamInstant, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; use core::time::Duration; @@ -177,7 +177,7 @@ trait DeviceErased: Send + Sync { trait StreamErased: Send + Sync { fn play(&self) -> Result<(), PlayStreamError>; fn pause(&self) -> Result<(), PauseStreamError>; - fn now(&self) -> crate::StreamInstant; + fn now(&self) -> StreamInstant; } fn device_to_erased(d: impl DeviceErased + 'static) -> Device { @@ -314,7 +314,7 @@ where ::pause(self) } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { ::now(self) } } @@ -441,7 +441,7 @@ impl StreamTrait for Stream { self.0.pause() } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { self.0.now() } } diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index 58d9d2503..18d9f8793 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -15,8 +15,8 @@ use crate::{ BufferSize, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, - SampleRate, StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, - SupportedStreamConfigRange, SupportedStreamConfigsError, + SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, + SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; // The emscripten backend currently works by instantiating an `AudioContext` object per `Stream`. @@ -279,8 +279,8 @@ impl StreamTrait for Stream { Ok(()) } - fn now(&self) -> crate::StreamInstant { - crate::StreamInstant::from_secs_f64(self.audio_ctxt.current_time()) + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.audio_ctxt.current_time()) } } @@ -303,7 +303,7 @@ where let data = temporary_buffer.as_mut_ptr() as *mut (); let mut data = unsafe { Data::from_parts(data, len, sample_format) }; let now_secs: f64 = audio_ctxt.current_time(); - let callback = crate::StreamInstant::from_secs_f64(now_secs); + let callback = StreamInstant::from_secs_f64(now_secs); // TODO: Use proper latency instead. Currently, unsupported on most browsers though, so // we estimate based on buffer size instead. Probably should use this, but it's only // supported by firefox (2020-04-28). diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index 83cdb3457..6659cea2b 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use crate::{ BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleRate, StreamError, + PlayStreamError, SampleRate, StreamError, StreamInstant, }; use super::JACK_SAMPLE_FORMAT; @@ -221,7 +221,7 @@ impl StreamTrait for Stream { Ok(()) } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { micros_to_stream_instant(self.async_client.as_client().time()) } @@ -403,8 +403,8 @@ impl jack::ProcessHandler for LocalProcessHandler { } } -fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { - crate::StreamInstant::from_micros(micros) +fn micros_to_stream_instant(micros: u64) -> StreamInstant { + StreamInstant::from_micros(micros) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index f62d86d48..c0d38bc63 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -2,7 +2,7 @@ use super::windows_err_to_cpal_err; use crate::traits::StreamTrait; use crate::{ BackendSpecificError, BufferSize, Data, FrameCount, InputCallbackInfo, OutputCallbackInfo, - PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamError, + PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamError, StreamInstant, }; use std::mem; use std::ptr; @@ -233,7 +233,7 @@ impl StreamTrait for Stream { Ok(()) } - fn now(&self) -> crate::StreamInstant { + fn now(&self) -> StreamInstant { let mut counter: i64 = 0; unsafe { Performance::QueryPerformanceCounter(&mut counter) @@ -242,7 +242,7 @@ impl StreamTrait for Stream { let nanos = counter as u128 * 1_000_000_000 / self.qpc_frequency as u128; let secs = (nanos / 1_000_000_000) as u64; let subsec_nanos = (nanos % 1_000_000_000) as u32; - crate::StreamInstant::new(secs, subsec_nanos) + StreamInstant::new(secs, subsec_nanos) } fn buffer_size(&self) -> Option { @@ -591,7 +591,7 @@ fn frames_to_duration(frames: FrameCount, rate: SampleRate) -> Duration { /// Use the stream's `IAudioClock` to produce the current stream instant. /// /// Uses the QPC position produced via the `GetPosition` method. -fn stream_instant(stream: &StreamInner) -> Result { +fn stream_instant(stream: &StreamInner) -> Result { let mut position: u64 = 0; let mut qpc_position: u64 = 0; unsafe { @@ -602,7 +602,7 @@ fn stream_instant(stream: &StreamInner) -> Result Result { // The `qpc_position` is in 100-nanosecond units. let nanos = buffer_qpc_position as u128 * 100; - let capture = crate::StreamInstant::new( + let capture = StreamInstant::new( (nanos / 1_000_000_000) as u64, (nanos % 1_000_000_000) as u32, ); diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index 33fd7c3c2..745b500dd 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -14,7 +14,7 @@ use crate::{ BackendSpecificError, BufferSize, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, - SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, + SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; use std::ops::DerefMut; @@ -353,8 +353,8 @@ impl DeviceTrait for Device { 0.0 } }; - let callback = crate::StreamInstant::from_secs_f64(now); - let playback = crate::StreamInstant::from_secs_f64( + let callback = StreamInstant::from_secs_f64(now); + let playback = StreamInstant::from_secs_f64( time_at_start_of_buffer + total_hw_latency_secs, ); let timestamp = crate::OutputStreamTimestamp { callback, playback }; @@ -496,8 +496,8 @@ impl StreamTrait for Stream { } } - fn now(&self) -> crate::StreamInstant { - crate::StreamInstant::from_secs_f64(self.ctx.current_time()) + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.ctx.current_time()) } fn buffer_size(&self) -> Option { From a3588537a7571daf815e1d2e0468616a8d6808f4 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 3 Apr 2026 23:59:42 +0200 Subject: [PATCH 12/17] fix(asio): prevent timestamp wraparounds --- src/host/asio/mod.rs | 6 +- src/host/asio/stream.rs | 128 +++++++++++++++++++++++++++++++++------- 2 files changed, 108 insertions(+), 26 deletions(-) diff --git a/src/host/asio/mod.rs b/src/host/asio/mod.rs index 4ecdb0f52..11d0783a0 100644 --- a/src/host/asio/mod.rs +++ b/src/host/asio/mod.rs @@ -157,11 +157,7 @@ impl StreamTrait for Stream { } fn now(&self) -> crate::StreamInstant { - // `ASIOTimeInfo::systemTime` is specified by the ASIO SDK as nanoseconds - // derived from `timeGetTime()`, so calling it here gives a value on the - // same clock as the `system_time` field delivered to every callback. - let ms = unsafe { windows::Win32::Media::timeGetTime() }; - crate::StreamInstant::from_nanos(ms as u64 * 1_000_000) + Stream::now(self) } fn buffer_size(&self) -> Option { diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 230dcabb8..9152f6369 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -9,11 +9,44 @@ use super::Device; use crate::{ BackendSpecificError, BufferSize, BuildStreamError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, StreamConfig, StreamError, + StreamInstant, }; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +/// Shared state for extending the 32-bit `timeGetTime()` millisecond counter into a +/// monotonic 64-bit value, shared between `now()` and audio callbacks. +pub(super) struct TimeBase { + pub last_ms: AtomicU32, + pub epoch_ms: AtomicU64, +} + +impl TimeBase { + pub fn new() -> Self { + Self { + last_ms: AtomicU32::new(0), + epoch_ms: AtomicU64::new(0), + } + } + + /// Convert a `timeGetTime()` millisecond value to a monotonic `StreamInstant`, + /// extending the 32-bit counter across its ~49.7-day wrap. + fn to_stream_instant(&self, ms: u32) -> StreamInstant { + // `Relaxed` is sufficient: callbacks run on a single ASIO thread. The only + // cross-thread caller is `now()`, which may race at wrap time (~1µs every 49.7 days). + let prev = self.last_ms.swap(ms, Ordering::Relaxed); + let epoch = if ms < prev { + self.epoch_ms + .fetch_add(u32::MAX as u64 + 1, Ordering::Relaxed) + + (u32::MAX as u64 + 1) + } else { + self.epoch_ms.load(Ordering::Relaxed) + }; + StreamInstant::from_millis(epoch + ms as u64) + } +} + pub struct Stream { playing: Arc, // Ensure the `Driver` does not terminate until the last stream is dropped. @@ -22,6 +55,7 @@ pub struct Stream { asio_streams: Arc>, callback_id: sys::BufferCallbackId, driver_event_callback_id: sys::DriverEventCallbackId, + time_base: Arc, } // Compile-time assertion that Stream is Send and Sync @@ -29,6 +63,14 @@ crate::assert_stream_send!(Stream); crate::assert_stream_sync!(Stream); impl Stream { + pub fn now(&self) -> StreamInstant { + // `ASIOTimeInfo::systemTime` is specified by the ASIO SDK as nanoseconds + // derived from `timeGetTime()`, so calling it here gives a value on the + // same clock as the `system_time` field delivered to every callback. + let ms = unsafe { windows::Win32::Media::timeGetTime() }; + self.time_base.to_stream_instant(ms) + } + pub fn play(&self) -> Result<(), PlayStreamError> { self.playing.store(true, Ordering::Release); Ok(()) @@ -92,10 +134,10 @@ impl Device { // Query hardware input latency (order matters: needs buffers created above). // Wrapped in Arc so the message callback can update it on // kAsioLatenciesChanged without touching the buffer callback. - let hardware_input_latency = Arc::new(AtomicUsize::new( + let hardware_input_latency = Arc::new(AtomicU32::new( driver .latencies() - .map(|latencies| latencies.input.max(0) as usize) + .map(|latencies| latencies.input.max(0) as u32) .unwrap_or(0), )); @@ -112,6 +154,9 @@ impl Device { let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; + let time_base = Arc::new(TimeBase::new()); + let time_base_cb = Arc::clone(&time_base); + // Set the input callback. // This is most performance critical part of the ASIO bindings. let callback_id = driver.add_callback(move |callback_info| unsafe { @@ -147,7 +192,10 @@ impl Device { ); } - let hardware_input_latency = hardware_input_latency.load(Ordering::Relaxed); + let hardware_input_latency = hardware_input_latency.load(Ordering::Relaxed) as usize; + + let callback_instant = + time_base_cb.to_stream_instant((callback_info.system_time / 1_000_000) as u32); /// 1. Write from the ASIO buffer to the interleaved CPAL buffer. /// 2. Deliver the CPAL buffer to the user callback. @@ -161,6 +209,7 @@ impl Device { format: SampleFormat, from_endianness: F, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where A: Copy, D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, @@ -183,7 +232,7 @@ impl Device { apply_input_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -201,6 +250,7 @@ impl Device { SampleFormat::I16, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt16MSB, SampleFormat::I16) => { @@ -213,6 +263,7 @@ impl Device { SampleFormat::I16, from_be, hardware_input_latency, + callback_instant, ); } @@ -226,6 +277,7 @@ impl Device { SampleFormat::F32, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTFloat32MSB, SampleFormat::F32) => { @@ -238,6 +290,7 @@ impl Device { SampleFormat::F32, from_be, hardware_input_latency, + callback_instant, ); } @@ -251,6 +304,7 @@ impl Device { SampleFormat::I32, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt32MSB, SampleFormat::I32) => { @@ -263,6 +317,7 @@ impl Device { SampleFormat::I32, from_be, hardware_input_latency, + callback_instant, ); } @@ -276,6 +331,7 @@ impl Device { SampleFormat::F64, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTFloat64MSB, SampleFormat::F64) => { @@ -288,6 +344,7 @@ impl Device { SampleFormat::F64, from_be, hardware_input_latency, + callback_instant, ); } @@ -300,6 +357,7 @@ impl Device { config.sample_rate, true, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt24MSB, SampleFormat::I24) => { @@ -311,6 +369,7 @@ impl Device { config.sample_rate, false, hardware_input_latency, + callback_instant, ); } @@ -333,6 +392,7 @@ impl Device { asio_streams, callback_id, driver_event_callback_id, + time_base: Arc::clone(&time_base), }) } @@ -379,10 +439,10 @@ impl Device { // Query hardware output latency (order matters: needs buffers created above). // Wrapped in Arc so the message callback can update it on // kAsioLatenciesChanged without touching the buffer callback. - let hardware_output_latency = Arc::new(AtomicUsize::new( + let hardware_output_latency = Arc::new(AtomicU32::new( driver .latencies() - .map(|latencies| latencies.output.max(0) as usize) + .map(|latencies| latencies.output.max(0) as u32) .unwrap_or(0), )); @@ -399,6 +459,9 @@ impl Device { let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; + let time_base = Arc::new(TimeBase::new()); + let time_base_cb = Arc::clone(&time_base); + let callback_id = driver.add_callback(move |callback_info| unsafe { // If not playing, return early. if !playing.load(Ordering::Acquire) { @@ -432,7 +495,10 @@ impl Device { ); } - let hardware_output_latency = hardware_output_latency.load(Ordering::Relaxed); + let hardware_output_latency = hardware_output_latency.load(Ordering::Relaxed) as usize; + + let callback_instant = + time_base_cb.to_stream_instant((callback_info.system_time / 1_000_000) as u32); // Silence the ASIO buffer that is about to be used. // @@ -460,6 +526,7 @@ impl Device { format: SampleFormat, mix_samples: F, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where A: Copy, D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, @@ -469,7 +536,7 @@ impl Device { apply_output_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -504,6 +571,7 @@ impl Device { from_le(old_sample).saturating_add(new_sample).to_le() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::I16, &sys::AsioSampleType::ASIOSTInt16MSB) => { @@ -519,6 +587,7 @@ impl Device { from_be(old_sample).saturating_add(new_sample).to_be() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::F32, &sys::AsioSampleType::ASIOSTFloat32LSB) => { @@ -536,6 +605,7 @@ impl Device { .to_le() }, hardware_output_latency, + callback_instant, ); } @@ -554,6 +624,7 @@ impl Device { .to_be() }, hardware_output_latency, + callback_instant, ); } @@ -570,6 +641,7 @@ impl Device { from_le(old_sample).saturating_add(new_sample).to_le() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::I32, &sys::AsioSampleType::ASIOSTInt32MSB) => { @@ -585,6 +657,7 @@ impl Device { from_be(old_sample).saturating_add(new_sample).to_be() }, hardware_output_latency, + callback_instant, ); } @@ -603,6 +676,7 @@ impl Device { .to_le() }, hardware_output_latency, + callback_instant, ); } @@ -621,6 +695,7 @@ impl Device { .to_be() }, hardware_output_latency, + callback_instant, ); } @@ -634,6 +709,7 @@ impl Device { callback_info, config.sample_rate, hardware_output_latency, + callback_instant, ); } @@ -647,6 +723,7 @@ impl Device { callback_info, config.sample_rate, hardware_output_latency, + callback_instant, ); } @@ -669,6 +746,7 @@ impl Device { asio_streams, callback_id, driver_event_callback_id, + time_base: Arc::clone(&time_base), }) } @@ -764,7 +842,7 @@ impl Device { &self, driver: &sys::Driver, error_callback: E, - hardware_latency: Arc, + hardware_latency: Arc, is_input: bool, ) -> sys::DriverEventCallbackId where @@ -807,7 +885,7 @@ impl Device { } else { latencies.output }; - hardware_latency.store(latency.max(0) as usize, Ordering::Relaxed); + hardware_latency.store(latency.max(0) as u32, Ordering::Relaxed); } false } @@ -1007,6 +1085,7 @@ unsafe fn process_output_callback_i24( asio_info: &sys::CallbackInfo, sample_rate: crate::SampleRate, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, { @@ -1015,7 +1094,7 @@ unsafe fn process_output_callback_i24( apply_output_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -1077,6 +1156,7 @@ unsafe fn process_input_callback_i24( sample_rate: crate::SampleRate, little_endian: bool, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, { @@ -1112,7 +1192,7 @@ unsafe fn process_input_callback_i24( apply_input_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -1123,7 +1203,7 @@ unsafe fn process_input_callback_i24( unsafe fn apply_output_callback_to_data( data_callback: &mut D, interleaved: &mut [A], - asio_info: &sys::CallbackInfo, + callback_instant: StreamInstant, sample_rate: crate::SampleRate, sample_format: SampleFormat, hardware_latency_frames: usize, @@ -1136,10 +1216,12 @@ unsafe fn apply_output_callback_to_data( interleaved.len(), sample_format, ); - let callback = crate::StreamInstant::from_nanos(asio_info.system_time); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let playback = callback + delay; - let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let playback = callback_instant + delay; + let timestamp = crate::OutputStreamTimestamp { + callback: callback_instant, + playback, + }; let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); } @@ -1148,7 +1230,7 @@ unsafe fn apply_output_callback_to_data( unsafe fn apply_input_callback_to_data( data_callback: &mut D, interleaved: &mut [A], - asio_info: &sys::CallbackInfo, + callback_instant: StreamInstant, sample_rate: crate::SampleRate, format: SampleFormat, hardware_latency_frames: usize, @@ -1161,10 +1243,14 @@ unsafe fn apply_input_callback_to_data( interleaved.len(), format, ); - let callback = crate::StreamInstant::from_nanos(asio_info.system_time); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let capture = callback - delay; - let timestamp = crate::InputStreamTimestamp { callback, capture }; + let capture = callback_instant + .checked_sub(delay) + .unwrap_or(StreamInstant::ZERO); + let timestamp = crate::InputStreamTimestamp { + callback: callback_instant, + capture, + }; let info = InputCallbackInfo { timestamp }; data_callback(&data, &info); } From 546d55b1d6ec0039367939e95bd43a429bc2dd00 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 4 Apr 2026 00:05:10 +0200 Subject: [PATCH 13/17] fix(asio): clippy fixes --- src/host/asio/stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 9152f6369..44632fbad 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -1148,6 +1148,7 @@ unsafe fn process_output_callback_i24( } } +#[allow(clippy::too_many_arguments)] unsafe fn process_input_callback_i24( data_callback: &mut D, interleaved: &mut [u8], From 3fe13825e1891e5a287d89aee10d62ba92b5b8e4 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 4 Apr 2026 00:23:25 +0200 Subject: [PATCH 14/17] doc: reference saturating_duration_since --- UPGRADING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index 1ca193f2e..4dc35021c 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -92,7 +92,7 @@ if let Some(d) = callback.checked_duration_since(start) { } ``` -**Why:** Mirrors the behavior of `std::time::Instant::duration_since` as of Rust standard library. +**Why:** Mirrors the saturating behavior of `std::time::Instant::saturating_duration_since` in the Rust standard library. ### `add` / `sub` renamed to `checked_add` / `checked_sub`; operator impls added From 701b49942f52e9d7792e35d14e693c3396deede4 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 4 Apr 2026 00:24:39 +0200 Subject: [PATCH 15/17] fix(pipewire): treat negative or zero pw_time as invalid --- src/host/pipewire/stream.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index ec5d585eb..314fa0fcf 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -188,7 +188,7 @@ fn pw_stream_time(stream: &pw::stream::Stream) -> Option { std::mem::size_of::(), ) }; - if rc != 0 || t.now == 0 || t.rate.denom == 0 { + if rc != 0 || t.now <= 0 || t.rate.denom == 0 { return None; } debug_assert_eq!(t.rate.num, 1, "unexpected pw_time rate.num"); @@ -220,10 +220,10 @@ where let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { description: "clock_gettime failed".to_owned(), })?; - let pl = cb + let capture = cb .checked_sub(frames_to_duration(frames, self.format.rate())) .unwrap_or(crate::StreamInstant::ZERO); - (cb, pl) + (cb, capture) } }; let timestamp = crate::InputStreamTimestamp { callback, capture }; From ab250a7227fa4cef58bcd22e7a2c3cf174169ea3 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 4 Apr 2026 00:28:54 +0200 Subject: [PATCH 16/17] fix(wasapi): align timestamps to 100 ns grid --- src/host/wasapi/stream.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index c0d38bc63..d0aa8a8f8 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -239,10 +239,15 @@ impl StreamTrait for Stream { Performance::QueryPerformanceCounter(&mut counter) .expect("QueryPerformanceCounter failed"); } - let nanos = counter as u128 * 1_000_000_000 / self.qpc_frequency as u128; - let secs = (nanos / 1_000_000_000) as u64; - let subsec_nanos = (nanos % 1_000_000_000) as u32; - StreamInstant::new(secs, subsec_nanos) + // Convert to 100-nanosecond units first, matching the precision of WASAPI QPCPosition + // values delivered to callbacks. This keeps `now()` on the same 100 ns grid as + // callback/capture/playback instants, avoiding false sub-100 ns deltas. + let units_100ns = counter as u128 * 10_000_000 / self.qpc_frequency as u128; + let nanos = units_100ns * 100; + StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ) } fn buffer_size(&self) -> Option { From e38d2e458cf500354312ecd323b60207a6336c9d Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 4 Apr 2026 00:29:44 +0200 Subject: [PATCH 17/17] fix(aaudio): prevent overflow constructing StreamInstant --- src/host/aaudio/convert.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/host/aaudio/convert.rs b/src/host/aaudio/convert.rs index 841c6867e..16210ab8a 100644 --- a/src/host/aaudio/convert.rs +++ b/src/host/aaudio/convert.rs @@ -15,7 +15,7 @@ pub fn now_stream_instant() -> StreamInstant { }; let res = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; assert_eq!(res, 0, "clock_gettime(CLOCK_MONOTONIC) failed"); - StreamInstant::from_nanos(ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64) + StreamInstant::new(ts.tv_sec as u64, ts.tv_nsec as u32) } /// Returns the [`StreamInstant`] of the most recent audio frame transferred by `stream`.