diff --git a/CHANGELOG.md b/CHANGELOG.md index c78c8a3a1..ffe9f7e40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `StreamConfig` now implements `Copy`. - `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback. - `device_by_id` is now dispatched to each backend's implementation, allowing to override it. +- `StreamTrait::now()` to query the current instant on the stream's clock. +- `StreamInstant` API changed and extended to mirror `std::time::Instant`/`Duration`. See + [UPGRADING.md](UPGRADING.md) for migration details. - **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`. - **PipeWire**: New host for Linux and some BSDs using the PipeWire API. - **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API. @@ -48,6 +51,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **WebAudio**: Timestamps now include base and output latency. - **WebAudio**: Initial buffer scheduling offset now scales with buffer duration. +### Removed + +- Replaced `StreamInstant::add()` and `sub()` by `checked_add()`/`+` and `checked_sub()`/`-`. + ### Fixed - Reintroduce `audio_thread_priority` feature. diff --git a/Cargo.toml b/Cargo.toml index 8e28c9ca5..118b509da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,12 +85,14 @@ clap = { version = ">=4.0, <=4.5.57", features = ["derive"] } # When updating this, also update the "windows-version" matrix in the CI workflow. [target.'cfg(target_os = "windows")'.dependencies] windows = { version = ">=0.59, <=0.62", features = [ + "Win32_Media", "Win32_Media_Audio", "Win32_Foundation", "Win32_Devices_Properties", "Win32_Media_KernelStreaming", "Win32_System_Com_StructuredStorage", "Win32_System_Threading", + "Win32_System_Performance", "Win32_Security", "Win32_System_SystemServices", "Win32_System_Variant", @@ -183,6 +185,7 @@ ndk = { version = "0.9", default-features = false, features = [ ] } ndk-context = "0.1" jni = "0.21" +libc = "0.2" num-derive = "0.4" num-traits = "0.2" diff --git a/UPGRADING.md b/UPGRADING.md index 4ece2f2f7..4dc35021c 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -8,6 +8,12 @@ This guide covers breaking changes requiring code updates. See [CHANGELOG.md](CH - [ ] Optionally handle the new `DeviceBusy` variant for retryable device errors - [ ] Change `build_*_stream` call sites to pass `StreamConfig` by value (drop the `&`) - [ ] For custom hosts, change `DeviceTrait` implementations to accept `StreamConfig` by value. +- [ ] Remove `instant.duration_since(e)` unwraps; it now returns `Duration` (saturating). +- [ ] Change `instant.add(d)` to `instant.checked_add(d)` (or use `instant + d`). +- [ ] Change `instant.sub(d)` to `instant.checked_sub(d)` (or use `instant - d`). +- [ ] Update `StreamInstant::new(secs, nanos)` call sites: `secs` is now `u64`. +- [ ] Update `StreamInstant::from_nanos(nanos)` call sites: `nanos` is now `u64`. +- [ ] Update `duration_since` call sites to pass by value (drop the `&`). ## 1. Error enums are now `#[non_exhaustive]` @@ -61,6 +67,73 @@ let stream = device.build_output_stream(config, data_fn, err_fn, None)?; If you implement `DeviceTrait` on your own type (via the `custom` feature), update your `build_input_stream_raw` and `build_output_stream_raw` signatures from `config: &StreamConfig` to `config: StreamConfig`. Any `config.clone()` calls before `move` closures can also be removed. +## 4. `StreamInstant` API overhaul + +The `StreamInstant` API has been aligned with `std::time::Instant` and `std::time::Duration`. + +### `duration_since` now returns `Duration` (saturating) + +**What changed:** `duration_since` now returns `Duration` directly, saturating to `Duration::ZERO` +when the argument is later than `self`, instead of returning `Option`. + +```rust +// Before (v0.17): returned Option, argument by reference +if let Some(d) = callback.duration_since(&start) { + println!("elapsed: {d:?}"); +} + +// After (v0.18): returns Duration (saturating), argument by value +let d = callback.duration_since(start); +println!("elapsed: {d:?}"); + +// For the previous Option-returning behaviour, use checked_duration_since: +if let Some(d) = callback.checked_duration_since(start) { + println!("elapsed: {d:?}"); +} +``` + +**Why:** Mirrors the saturating behavior of `std::time::Instant::saturating_duration_since` in the Rust standard library. + +### `add` / `sub` renamed to `checked_add` / `checked_sub`; operator impls added + +**What changed:** The `add` and `sub` methods (which returned `Option`) are replaced by +`checked_add` / `checked_sub` with the same semantics. `+`, `-`, `+=`, and `-=` operator impls +are also added. + +```rust +// Before (v0.17) +let future = instant.add(Duration::from_millis(10)).expect("overflow"); +let past = instant.sub(Duration::from_millis(10)).expect("underflow"); + +// After (v0.18): explicit checked form (same semantics): +let future = instant.checked_add(Duration::from_millis(10)).expect("overflow"); +let past = instant.checked_sub(Duration::from_millis(10)).expect("underflow"); + +// Or use the operator (panics on overflow, like std::time::Instant): +let future = instant + Duration::from_millis(10); +let past = instant - Duration::from_millis(10); + +// Subtract two instants to get a Duration (saturates to zero): +let elapsed: Duration = later - earlier; +``` + +**Why:** Aligns the API with `std::time::Instant`, making `StreamInstant` more idiomatic. + +### `new` and `from_nanos` take unsigned integers + +**What changed:** The `secs` parameter of `StreamInstant::new` and the `nanos` parameter of +`StreamInstant::from_nanos` are now `u64` instead of `i64`. + +```rust +// Before (v0.17): negative seconds were accepted +StreamInstant::new(-1_i64, 0); + +// After (v0.18): all stream clocks are non-negative +StreamInstant::new(0_u64, 0); +``` + +**Why:** All audio host clocks are positive and monotonic; they are never negative. + --- # Upgrading from v0.16 to v0.17 diff --git a/examples/custom.rs b/examples/custom.rs index baf405661..e1f40d045 100644 --- a/examples/custom.rs +++ b/examples/custom.rs @@ -2,6 +2,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +use std::time::Instant; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, @@ -19,7 +20,10 @@ struct MyDevice; // Only Send+Sync is needed struct MyStream { controls: Arc, - // option is needed since joining a thread takes ownership, + // The instant the audio thread was started; shared with now() so that + // callback timestamps and now() are on the same time base. + start: Instant, + // Option is needed since joining a thread takes ownership, // and we want to do that on drop (gives us &mut self, not self) handle: Option>, } @@ -138,9 +142,9 @@ impl DeviceTrait for MyDevice { pause: AtomicBool::new(true), // streams are expected to start out paused by default }); + let start = Instant::now(); let thread_controls = controls.clone(); let handle = std::thread::spawn(move || { - let start = std::time::Instant::now(); let mut buffer = [0.0_f32; 4096]; while !thread_controls.exit.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_secs_f32( @@ -161,7 +165,7 @@ impl DeviceTrait for MyDevice { ) }; - let duration = std::time::Instant::now().duration_since(start); + let duration = Instant::now().duration_since(start); let secs = duration.as_nanos() / 1_000_000_000; let subsec_nanos = duration.as_nanos() - secs * 1_000_000_000; let stream_instant = cpal::StreamInstant::new(secs as _, subsec_nanos as _); @@ -178,6 +182,7 @@ impl DeviceTrait for MyDevice { Ok(MyStream { controls, + start, handle: Some(handle), }) } @@ -193,6 +198,11 @@ impl StreamTrait for MyStream { self.controls.pause.store(true, Ordering::Relaxed); Ok(()) } + + fn now(&self) -> cpal::StreamInstant { + let elapsed = self.start.elapsed(); + cpal::StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()) + } } // streams are expected to stop when dropped @@ -315,9 +325,7 @@ pub fn make_stream( config, move |output: &mut [f32], _: &cpal::OutputCallbackInfo| { // for 0-1s play sine, 1-2s play square, 2-3s play saw, 3-4s play triangle_wave - let time_since_start = std::time::Instant::now() - .duration_since(time_at_start) - .as_secs_f32(); + let time_since_start = Instant::now().duration_since(time_at_start).as_secs_f32(); if time_since_start < 1.0 { oscillator.set_waveform(Waveform::Sine); } else if time_since_start < 2.0 { diff --git a/src/host/aaudio/convert.rs b/src/host/aaudio/convert.rs index 7a085fa13..16210ab8a 100644 --- a/src/host/aaudio/convert.rs +++ b/src/host/aaudio/convert.rs @@ -1,5 +1,4 @@ -use std::convert::TryInto; -use std::time::Duration; +//! Time-conversion helpers for the AAudio backend. extern crate ndk; @@ -8,13 +7,18 @@ use crate::{ StreamInstant, }; -pub fn to_stream_instant(duration: Duration) -> StreamInstant { - StreamInstant::new( - duration.as_secs().try_into().unwrap(), - duration.subsec_nanos(), - ) +/// Returns a [`StreamInstant`] for the current moment. +pub fn now_stream_instant() -> StreamInstant { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + let res = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + assert_eq!(res, 0, "clock_gettime(CLOCK_MONOTONIC) failed"); + StreamInstant::new(ts.tv_sec as u64, ts.tv_nsec as u32) } +/// Returns the [`StreamInstant`] of the most recent audio frame transferred by `stream`. pub fn stream_instant(stream: &ndk::audio::AudioStream) -> StreamInstant { let ts = stream .timestamp(ndk::audio::Clockid::Monotonic) @@ -22,7 +26,7 @@ pub fn stream_instant(stream: &ndk::audio::AudioStream) -> StreamInstant { frame_position: 0, time_nanoseconds: 0, }); - to_stream_instant(Duration::from_nanos(ts.time_nanoseconds as u64)) + StreamInstant::from_nanos(ts.time_nanoseconds as u64) } impl From for StreamError { diff --git a/src/host/aaudio/mod.rs b/src/host/aaudio/mod.rs index 80abe2795..9a2394f2b 100644 --- a/src/host/aaudio/mod.rs +++ b/src/host/aaudio/mod.rs @@ -6,12 +6,12 @@ use std::cmp; use std::convert::TryInto; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::IntoIter as VecIntoIter; extern crate ndk; -use convert::{stream_instant, to_stream_instant}; +use convert::{now_stream_instant, stream_instant}; use java_interface::{AudioDeviceInfo, AudioManager}; use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; @@ -316,13 +316,12 @@ where E: FnMut(StreamError) + Send + 'static, { let builder = configure_for_device(builder, device, config); - let created = Instant::now(); let channel_count = config.channels as i32; let stream = builder .data_callback(Box::new(move |stream, data, num_frames| { let cb_info = InputCallbackInfo { timestamp: InputStreamTimestamp { - callback: to_stream_instant(created.elapsed()), + callback: now_stream_instant(), capture: stream_instant(stream), }, }; @@ -366,7 +365,6 @@ where E: FnMut(StreamError) + Send + 'static, { let builder = configure_for_device(builder, device, config); - let created = Instant::now(); let channel_count = config.channels as i32; let tune_dynamically = config.buffer_size == BufferSize::Default; @@ -378,7 +376,7 @@ where // Deliver audio data to user callback let cb_info = OutputCallbackInfo { timestamp: OutputStreamTimestamp { - callback: to_stream_instant(created.elapsed()), + callback: now_stream_instant(), playback: stream_instant(stream), }, }; @@ -719,6 +717,10 @@ impl StreamTrait for Stream { } } + fn now(&self) -> crate::StreamInstant { + now_stream_instant() + } + fn buffer_size(&self) -> Option { let stream = self.inner.lock().ok()?; diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index a2c801149..3fc234ad3 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -27,8 +27,8 @@ use crate::{ DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection, DeviceId, DeviceIdError, DeviceNameError, DevicesError, FrameCount, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, - StreamError, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, - SupportedStreamConfigsError, + StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, + SupportedStreamConfigRange, SupportedStreamConfigsError, }; mod enumerate; @@ -1065,11 +1065,8 @@ fn process_input( }?; let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); let capture = callback - .sub(delay_duration) - .ok_or_else(|| BackendSpecificError { - description: "`capture` is earlier than representation supported by `StreamInstant`" - .to_string(), - })?; + .checked_sub(delay_duration) + .unwrap_or(StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); @@ -1098,12 +1095,7 @@ fn process_output( stream_timestamp_fallback(stream.creation_instant) }?; let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); - let playback = callback - .add(delay_duration) - .ok_or_else(|| BackendSpecificError { - description: "`playback` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; + let playback = callback + delay_duration; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = crate::OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); @@ -1135,7 +1127,7 @@ fn process_output( #[inline] fn stream_timestamp_hardware( status: &alsa::pcm::Status, -) -> Result { +) -> Result { let trigger_ts = status.get_trigger_htstamp(); // trigger_htstamp records when the PCM stream started. // On the first few callbacks, it might not have been set yet, @@ -1156,7 +1148,7 @@ fn stream_timestamp_hardware( ); return Err(BackendSpecificError { description }); } - Ok(crate::StreamInstant::from_nanos(nanos)) + Ok(StreamInstant::from_nanos(nanos as u64)) } // Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. @@ -1165,12 +1157,13 @@ fn stream_timestamp_hardware( #[inline] fn stream_timestamp_fallback( creation: std::time::Instant, -) -> Result { +) -> Result { let now = std::time::Instant::now(); let duration = now.duration_since(creation); - crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { - description: "stream duration has exceeded `StreamInstant` representation".to_string(), - }) + Ok(StreamInstant::new( + duration.as_secs(), + duration.subsec_nanos(), + )) } // Adapted from `timestamp2ns` here: @@ -1284,6 +1277,18 @@ impl StreamTrait for Stream { self.inner.channel.pause(true).ok(); Ok(()) } + fn now(&self) -> StreamInstant { + if self.inner.use_hw_timestamps { + if let Ok(status) = self.inner.channel.status() { + if let Ok(instant) = stream_timestamp_hardware(&status) { + return instant; + } + } + } + stream_timestamp_fallback(self.inner.creation_instant) + .expect("stream duration exceeded `StreamInstant` range") + } + fn buffer_size(&self) -> Option { Some(self.inner.period_frames as FrameCount) } diff --git a/src/host/asio/mod.rs b/src/host/asio/mod.rs index 4cb4af354..11d0783a0 100644 --- a/src/host/asio/mod.rs +++ b/src/host/asio/mod.rs @@ -156,6 +156,10 @@ impl StreamTrait for Stream { Stream::pause(self) } + fn now(&self) -> crate::StreamInstant { + Stream::now(self) + } + fn buffer_size(&self) -> Option { Stream::buffer_size(self) } diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 04cc1e8d1..44632fbad 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -9,11 +9,44 @@ use super::Device; use crate::{ BackendSpecificError, BufferSize, BuildStreamError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, StreamConfig, StreamError, + StreamInstant, }; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +/// Shared state for extending the 32-bit `timeGetTime()` millisecond counter into a +/// monotonic 64-bit value, shared between `now()` and audio callbacks. +pub(super) struct TimeBase { + pub last_ms: AtomicU32, + pub epoch_ms: AtomicU64, +} + +impl TimeBase { + pub fn new() -> Self { + Self { + last_ms: AtomicU32::new(0), + epoch_ms: AtomicU64::new(0), + } + } + + /// Convert a `timeGetTime()` millisecond value to a monotonic `StreamInstant`, + /// extending the 32-bit counter across its ~49.7-day wrap. + fn to_stream_instant(&self, ms: u32) -> StreamInstant { + // `Relaxed` is sufficient: callbacks run on a single ASIO thread. The only + // cross-thread caller is `now()`, which may race at wrap time (~1µs every 49.7 days). + let prev = self.last_ms.swap(ms, Ordering::Relaxed); + let epoch = if ms < prev { + self.epoch_ms + .fetch_add(u32::MAX as u64 + 1, Ordering::Relaxed) + + (u32::MAX as u64 + 1) + } else { + self.epoch_ms.load(Ordering::Relaxed) + }; + StreamInstant::from_millis(epoch + ms as u64) + } +} + pub struct Stream { playing: Arc, // Ensure the `Driver` does not terminate until the last stream is dropped. @@ -22,6 +55,7 @@ pub struct Stream { asio_streams: Arc>, callback_id: sys::BufferCallbackId, driver_event_callback_id: sys::DriverEventCallbackId, + time_base: Arc, } // Compile-time assertion that Stream is Send and Sync @@ -29,6 +63,14 @@ crate::assert_stream_send!(Stream); crate::assert_stream_sync!(Stream); impl Stream { + pub fn now(&self) -> StreamInstant { + // `ASIOTimeInfo::systemTime` is specified by the ASIO SDK as nanoseconds + // derived from `timeGetTime()`, so calling it here gives a value on the + // same clock as the `system_time` field delivered to every callback. + let ms = unsafe { windows::Win32::Media::timeGetTime() }; + self.time_base.to_stream_instant(ms) + } + pub fn play(&self) -> Result<(), PlayStreamError> { self.playing.store(true, Ordering::Release); Ok(()) @@ -92,10 +134,10 @@ impl Device { // Query hardware input latency (order matters: needs buffers created above). // Wrapped in Arc so the message callback can update it on // kAsioLatenciesChanged without touching the buffer callback. - let hardware_input_latency = Arc::new(AtomicUsize::new( + let hardware_input_latency = Arc::new(AtomicU32::new( driver .latencies() - .map(|latencies| latencies.input.max(0) as usize) + .map(|latencies| latencies.input.max(0) as u32) .unwrap_or(0), )); @@ -112,6 +154,9 @@ impl Device { let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; + let time_base = Arc::new(TimeBase::new()); + let time_base_cb = Arc::clone(&time_base); + // Set the input callback. // This is most performance critical part of the ASIO bindings. let callback_id = driver.add_callback(move |callback_info| unsafe { @@ -147,7 +192,10 @@ impl Device { ); } - let hardware_input_latency = hardware_input_latency.load(Ordering::Relaxed); + let hardware_input_latency = hardware_input_latency.load(Ordering::Relaxed) as usize; + + let callback_instant = + time_base_cb.to_stream_instant((callback_info.system_time / 1_000_000) as u32); /// 1. Write from the ASIO buffer to the interleaved CPAL buffer. /// 2. Deliver the CPAL buffer to the user callback. @@ -161,6 +209,7 @@ impl Device { format: SampleFormat, from_endianness: F, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where A: Copy, D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, @@ -183,7 +232,7 @@ impl Device { apply_input_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -201,6 +250,7 @@ impl Device { SampleFormat::I16, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt16MSB, SampleFormat::I16) => { @@ -213,6 +263,7 @@ impl Device { SampleFormat::I16, from_be, hardware_input_latency, + callback_instant, ); } @@ -226,6 +277,7 @@ impl Device { SampleFormat::F32, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTFloat32MSB, SampleFormat::F32) => { @@ -238,6 +290,7 @@ impl Device { SampleFormat::F32, from_be, hardware_input_latency, + callback_instant, ); } @@ -251,6 +304,7 @@ impl Device { SampleFormat::I32, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt32MSB, SampleFormat::I32) => { @@ -263,6 +317,7 @@ impl Device { SampleFormat::I32, from_be, hardware_input_latency, + callback_instant, ); } @@ -276,6 +331,7 @@ impl Device { SampleFormat::F64, from_le, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTFloat64MSB, SampleFormat::F64) => { @@ -288,6 +344,7 @@ impl Device { SampleFormat::F64, from_be, hardware_input_latency, + callback_instant, ); } @@ -300,6 +357,7 @@ impl Device { config.sample_rate, true, hardware_input_latency, + callback_instant, ); } (&sys::AsioSampleType::ASIOSTInt24MSB, SampleFormat::I24) => { @@ -311,6 +369,7 @@ impl Device { config.sample_rate, false, hardware_input_latency, + callback_instant, ); } @@ -333,6 +392,7 @@ impl Device { asio_streams, callback_id, driver_event_callback_id, + time_base: Arc::clone(&time_base), }) } @@ -379,10 +439,10 @@ impl Device { // Query hardware output latency (order matters: needs buffers created above). // Wrapped in Arc so the message callback can update it on // kAsioLatenciesChanged without touching the buffer callback. - let hardware_output_latency = Arc::new(AtomicUsize::new( + let hardware_output_latency = Arc::new(AtomicU32::new( driver .latencies() - .map(|latencies| latencies.output.max(0) as usize) + .map(|latencies| latencies.output.max(0) as u32) .unwrap_or(0), )); @@ -399,6 +459,9 @@ impl Device { let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; + let time_base = Arc::new(TimeBase::new()); + let time_base_cb = Arc::clone(&time_base); + let callback_id = driver.add_callback(move |callback_info| unsafe { // If not playing, return early. if !playing.load(Ordering::Acquire) { @@ -432,7 +495,10 @@ impl Device { ); } - let hardware_output_latency = hardware_output_latency.load(Ordering::Relaxed); + let hardware_output_latency = hardware_output_latency.load(Ordering::Relaxed) as usize; + + let callback_instant = + time_base_cb.to_stream_instant((callback_info.system_time / 1_000_000) as u32); // Silence the ASIO buffer that is about to be used. // @@ -460,6 +526,7 @@ impl Device { format: SampleFormat, mix_samples: F, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where A: Copy, D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, @@ -469,7 +536,7 @@ impl Device { apply_output_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -504,6 +571,7 @@ impl Device { from_le(old_sample).saturating_add(new_sample).to_le() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::I16, &sys::AsioSampleType::ASIOSTInt16MSB) => { @@ -519,6 +587,7 @@ impl Device { from_be(old_sample).saturating_add(new_sample).to_be() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::F32, &sys::AsioSampleType::ASIOSTFloat32LSB) => { @@ -536,6 +605,7 @@ impl Device { .to_le() }, hardware_output_latency, + callback_instant, ); } @@ -554,6 +624,7 @@ impl Device { .to_be() }, hardware_output_latency, + callback_instant, ); } @@ -570,6 +641,7 @@ impl Device { from_le(old_sample).saturating_add(new_sample).to_le() }, hardware_output_latency, + callback_instant, ); } (SampleFormat::I32, &sys::AsioSampleType::ASIOSTInt32MSB) => { @@ -585,6 +657,7 @@ impl Device { from_be(old_sample).saturating_add(new_sample).to_be() }, hardware_output_latency, + callback_instant, ); } @@ -603,6 +676,7 @@ impl Device { .to_le() }, hardware_output_latency, + callback_instant, ); } @@ -621,6 +695,7 @@ impl Device { .to_be() }, hardware_output_latency, + callback_instant, ); } @@ -634,6 +709,7 @@ impl Device { callback_info, config.sample_rate, hardware_output_latency, + callback_instant, ); } @@ -647,6 +723,7 @@ impl Device { callback_info, config.sample_rate, hardware_output_latency, + callback_instant, ); } @@ -669,6 +746,7 @@ impl Device { asio_streams, callback_id, driver_event_callback_id, + time_base: Arc::clone(&time_base), }) } @@ -764,7 +842,7 @@ impl Device { &self, driver: &sys::Driver, error_callback: E, - hardware_latency: Arc, + hardware_latency: Arc, is_input: bool, ) -> sys::DriverEventCallbackId where @@ -807,7 +885,7 @@ impl Device { } else { latencies.output }; - hardware_latency.store(latency.max(0) as usize, Ordering::Relaxed); + hardware_latency.store(latency.max(0) as u32, Ordering::Relaxed); } false } @@ -1007,6 +1085,7 @@ unsafe fn process_output_callback_i24( asio_info: &sys::CallbackInfo, sample_rate: crate::SampleRate, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, { @@ -1015,7 +1094,7 @@ unsafe fn process_output_callback_i24( apply_output_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -1069,6 +1148,7 @@ unsafe fn process_output_callback_i24( } } +#[allow(clippy::too_many_arguments)] unsafe fn process_input_callback_i24( data_callback: &mut D, interleaved: &mut [u8], @@ -1077,6 +1157,7 @@ unsafe fn process_input_callback_i24( sample_rate: crate::SampleRate, little_endian: bool, hardware_latency_frames: usize, + callback_instant: StreamInstant, ) where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, { @@ -1112,7 +1193,7 @@ unsafe fn process_input_callback_i24( apply_input_callback_to_data::( data_callback, interleaved, - asio_info, + callback_instant, sample_rate, format, hardware_latency_frames, @@ -1123,7 +1204,7 @@ unsafe fn process_input_callback_i24( unsafe fn apply_output_callback_to_data( data_callback: &mut D, interleaved: &mut [A], - asio_info: &sys::CallbackInfo, + callback_instant: StreamInstant, sample_rate: crate::SampleRate, sample_format: SampleFormat, hardware_latency_frames: usize, @@ -1136,13 +1217,12 @@ unsafe fn apply_output_callback_to_data( interleaved.len(), sample_format, ); - let callback = crate::StreamInstant::from_nanos_i128(asio_info.system_time as i128) - .expect("`system_time` out of range of `StreamInstant` representation"); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let playback = callback_instant + delay; + let timestamp = crate::OutputStreamTimestamp { + callback: callback_instant, + playback, + }; let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); } @@ -1151,7 +1231,7 @@ unsafe fn apply_output_callback_to_data( unsafe fn apply_input_callback_to_data( data_callback: &mut D, interleaved: &mut [A], - asio_info: &sys::CallbackInfo, + callback_instant: StreamInstant, sample_rate: crate::SampleRate, format: SampleFormat, hardware_latency_frames: usize, @@ -1164,13 +1244,14 @@ unsafe fn apply_input_callback_to_data( interleaved.len(), format, ); - let callback = crate::StreamInstant::from_nanos_i128(asio_info.system_time as i128) - .expect("`system_time` out of range of `StreamInstant` representation"); let delay = frames_to_duration(hardware_latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); - let timestamp = crate::InputStreamTimestamp { callback, capture }; + let capture = callback_instant + .checked_sub(delay) + .unwrap_or(StreamInstant::ZERO); + let timestamp = crate::InputStreamTimestamp { + callback: callback_instant, + capture, + }; let info = InputCallbackInfo { timestamp }; data_callback(&data, &info); } diff --git a/src/host/audioworklet/mod.rs b/src/host/audioworklet/mod.rs index 3972d385c..26154a56b 100644 --- a/src/host/audioworklet/mod.rs +++ b/src/host/audioworklet/mod.rs @@ -14,7 +14,7 @@ use crate::{ BackendSpecificError, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, - SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, + SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; @@ -243,7 +243,11 @@ impl DeviceTrait for Device { .unwrap_or(0.0); let total_output_latency_secs = { let sum = base_latency_secs + output_latency_secs; - if sum.is_finite() { sum.max(0.0) } else { 0.0 } + if sum.is_finite() { + sum.max(0.0) + } else { + 0.0 + } }; options.set_processor_options(Some(&js_sys::Array::of3( @@ -256,13 +260,11 @@ impl DeviceTrait for Device { Data::from_parts(data, interleaved_data.len(), sample_format) }; - let callback = crate::StreamInstant::from_secs_f64(now); + let callback = StreamInstant::from_secs_f64(now); let buffer_duration = frames_to_duration(frame_size as _, sample_rate); let playback = callback - .add(buffer_duration + Duration::from_secs_f64(total_output_latency_secs)) - .expect( - "`playback` occurs beyond representation supported by `StreamInstant`", - ); + + (buffer_duration + + Duration::from_secs_f64(total_output_latency_secs)); let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; (data_callback)(&mut data, &info); @@ -319,6 +321,10 @@ impl StreamTrait for Stream { } } } + + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.audio_context.current_time()) + } } impl Drop for Stream { diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index 750b5d0d3..5fcd4af33 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -16,8 +16,9 @@ use crate::{ BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, - SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, + PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, }; use self::enumerate::{ @@ -279,6 +280,11 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> StreamInstant { + let m_host_time = unsafe { mach2::mach_time::mach_absolute_time() }; + host_time_to_stream_instant(m_host_time).expect("mach_timebase_info failed") + } + fn buffer_size(&self) -> Option { Some(get_device_buffer_frames() as crate::FrameCount) } @@ -501,9 +507,7 @@ where } }); let delay = frames_to_duration(latency_frames, sample_rate); - let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let capture = callback.checked_sub(delay).unwrap_or(StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -552,9 +556,7 @@ where } }); let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + delay; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 7267d9d50..7793588ce 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -797,8 +797,8 @@ impl Device { device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); let capture = callback - .sub(delay) - .expect("`capture` occurs before origin of alsa `StreamInstant`"); + .checked_sub(delay) + .unwrap_or(crate::StreamInstant::ZERO); let timestamp = crate::InputStreamTimestamp { callback, capture }; let info = InputCallbackInfo { timestamp }; @@ -896,9 +896,7 @@ impl Device { let latency_frames = device_buffer_frames.unwrap_or(buffer_frames) + extra_latency_frames; let delay = frames_to_duration(latency_frames, sample_rate); - let playback = callback - .add(delay) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + delay; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index d94525037..b0bb9499d 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -263,6 +263,11 @@ impl StreamTrait for Stream { stream.pause() } + fn now(&self) -> crate::StreamInstant { + let m_host_time = unsafe { mach2::mach_time::mach_absolute_time() }; + host_time_to_stream_instant(m_host_time).expect("mach_timebase_info failed") + } + fn buffer_size(&self) -> Option { let stream = self.inner.lock().ok()?; diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index d982ca721..3255a7992 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -81,9 +81,11 @@ fn host_time_to_stream_instant( let res = unsafe { mach2::mach_time::mach_timebase_info(&mut info) }; check_os_status(res)?; let nanos = m_host_time as u128 * info.numer as u128 / info.denom as u128; - crate::StreamInstant::from_nanos_i128(nanos as i128).ok_or(BackendSpecificError { - description: "host time out of range of `StreamInstant` representation".to_string(), - }) + let secs = u64::try_from(nanos / 1_000_000_000).map_err(|_| BackendSpecificError { + description: "mach absolute time overflow".to_string(), + })?; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + Ok(crate::StreamInstant::new(secs, subsec_nanos)) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/custom/mod.rs b/src/host/custom/mod.rs index 81f7d3018..7b7ebbbd1 100644 --- a/src/host/custom/mod.rs +++ b/src/host/custom/mod.rs @@ -7,7 +7,7 @@ use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; use crate::{ BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleFormat, StreamConfig, StreamError, SupportedStreamConfig, + PlayStreamError, SampleFormat, StreamConfig, StreamError, StreamInstant, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; use core::time::Duration; @@ -177,6 +177,7 @@ trait DeviceErased: Send + Sync { trait StreamErased: Send + Sync { fn play(&self) -> Result<(), PlayStreamError>; fn pause(&self) -> Result<(), PauseStreamError>; + fn now(&self) -> StreamInstant; } fn device_to_erased(d: impl DeviceErased + 'static) -> Device { @@ -312,6 +313,10 @@ where fn pause(&self) -> Result<(), PauseStreamError> { ::pause(self) } + + fn now(&self) -> StreamInstant { + ::now(self) + } } // implementations of HostTrait, DeviceTrait, and StreamTrait for custom versions @@ -435,4 +440,8 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), PauseStreamError> { self.0.pause() } + + fn now(&self) -> StreamInstant { + self.0.now() + } } diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index c43119996..18d9f8793 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -15,8 +15,8 @@ use crate::{ BufferSize, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, - SampleRate, StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, - SupportedStreamConfigRange, SupportedStreamConfigsError, + SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, + SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; // The emscripten backend currently works by instantiating an `AudioContext` object per `Stream`. @@ -278,6 +278,10 @@ impl StreamTrait for Stream { }); Ok(()) } + + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.audio_ctxt.current_time()) + } } fn audio_callback_fn( @@ -299,15 +303,13 @@ where let data = temporary_buffer.as_mut_ptr() as *mut (); let mut data = unsafe { Data::from_parts(data, len, sample_format) }; let now_secs: f64 = audio_ctxt.current_time(); - let callback = crate::StreamInstant::from_secs_f64(now_secs); + let callback = StreamInstant::from_secs_f64(now_secs); // TODO: Use proper latency instead. Currently, unsupported on most browsers though, so // we estimate based on buffer size instead. Probably should use this, but it's only // supported by firefox (2020-04-28). // let latency_secs: f64 = audio_ctxt.outputLatency.try_into().unwrap(); let buffer_duration = frames_to_duration(len, sample_rate as usize); - let playback = callback - .add(buffer_duration) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + buffer_duration; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index dab1cb995..6659cea2b 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use crate::{ BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, - PlayStreamError, SampleRate, StreamError, + PlayStreamError, SampleRate, StreamError, StreamInstant, }; use super::JACK_SAMPLE_FORMAT; @@ -221,6 +221,10 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> StreamInstant { + micros_to_stream_instant(self.async_client.as_client().time()) + } + fn buffer_size(&self) -> Option { Some(self.async_client.as_client().buffer_size() as crate::FrameCount) } @@ -243,7 +247,6 @@ struct LocalProcessHandler { temp_input_buffer: Vec, temp_output_buffer: Vec, playing: Arc, - creation_timestamp: std::time::Instant, /// This should not be called on `process`, only on `buffer_size` because it can block. error_callback_ptr: ErrorCallbackPtr, } @@ -274,7 +277,6 @@ impl LocalProcessHandler { temp_input_buffer, temp_output_buffer, playing, - creation_timestamp: std::time::Instant::now(), error_callback_ptr, } } @@ -288,7 +290,11 @@ fn temp_buffer_to_data(temp_input_buffer: &mut [f32], total_buffer_size: usize) } impl jack::ProcessHandler for LocalProcessHandler { - fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { + fn process( + &mut self, + client: &jack::Client, + process_scope: &jack::ProcessScope, + ) -> jack::Control { if !self.playing.load(Ordering::SeqCst) { return jack::Control::Continue; } @@ -301,20 +307,18 @@ impl jack::ProcessHandler for LocalProcessHandler { let (current_start_usecs, next_usecs_opt) = match process_scope.cycle_times() { Ok(times) => (times.current_usecs, Some(times.next_usecs)), Err(_) => { - // jack was unable to get the current time information - // Fall back to using Instants - let now = std::time::Instant::now(); - let duration = now.duration_since(self.creation_timestamp); - (duration.as_micros() as u64, None) + // JACK was unable to get the current time information. + // Fall back to jack_get_time(), which is the same clock source + // used by now() and cycle_times(), so the epoch stays consistent. + (client.time(), None) } }; let start_cycle_instant = micros_to_stream_instant(current_start_usecs); let start_callback_instant = start_cycle_instant - .add(frames_to_duration( + + frames_to_duration( process_scope.frames_since_cycle_start() as usize, self.sample_rate, - )) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + ); if let Some(input_callback) = &mut self.input_data_callback { // Let's get the data from the input ports and run the callback @@ -357,9 +361,9 @@ impl jack::ProcessHandler for LocalProcessHandler { // exact instant at which the last sample written here will be consumed by the device. let playback = match next_usecs_opt { Some(next_usecs) => micros_to_stream_instant(next_usecs), - None => start_cycle_instant - .add(frames_to_duration(current_frame_count, self.sample_rate)) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"), + None => { + start_cycle_instant + frames_to_duration(current_frame_count, self.sample_rate) + } }; let timestamp = crate::OutputStreamTimestamp { callback, playback }; let info = crate::OutputCallbackInfo { timestamp }; @@ -399,9 +403,8 @@ impl jack::ProcessHandler for LocalProcessHandler { } } -fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { - crate::StreamInstant::from_nanos_i128(micros as i128 * 1_000) - .expect("`micros` out of range of `StreamInstant` representation") +fn micros_to_stream_instant(micros: u64) -> StreamInstant { + StreamInstant::from_micros(micros) } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. diff --git a/src/host/null/mod.rs b/src/host/null/mod.rs index a42ae9bd2..ac5cbe57f 100644 --- a/src/host/null/mod.rs +++ b/src/host/null/mod.rs @@ -143,6 +143,10 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), PauseStreamError> { unimplemented!() } + + fn now(&self) -> crate::StreamInstant { + unimplemented!() + } } impl Iterator for Devices { diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 9f41bd4c4..314fa0fcf 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -4,7 +4,6 @@ use std::{ Arc, }, thread::JoinHandle, - time::Instant, }; use crate::{ @@ -68,6 +67,10 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> crate::StreamInstant { + monotonic_stream_instant().expect("clock_gettime failed") + } + fn buffer_size(&self) -> Option { match self.last_quantum.load(Ordering::Relaxed) { 0 => None, @@ -142,7 +145,6 @@ pub struct UserData { error_callback: E, sample_format: SampleFormat, format: pw::spa::param::audio::AudioInfoRaw, - created_instance: Instant, last_quantum: Arc, } impl UserData @@ -186,7 +188,7 @@ fn pw_stream_time(stream: &pw::stream::Stream) -> Option { std::mem::size_of::(), ) }; - if rc != 0 || t.now == 0 || t.rate.denom == 0 { + if rc != 0 || t.now <= 0 || t.rate.denom == 0 { return None; } debug_assert_eq!(t.rate.num, 1, "unexpected pw_time rate.num"); @@ -211,19 +213,17 @@ where self.last_quantum.store(frames as u64, Ordering::Relaxed); let (callback, capture) = match pw_stream_time(stream) { Some(PwTime { now_ns, delay_ns }) => ( - StreamInstant::from_nanos(now_ns), - StreamInstant::from_nanos(now_ns - delay_ns), + StreamInstant::from_nanos(now_ns as u64), + StreamInstant::from_nanos((now_ns - delay_ns.max(0)) as u64), ), None => { - let cb = stream_timestamp_fallback(self.created_instance)?; - let pl = cb - .sub(frames_to_duration(frames, self.format.rate())) - .ok_or_else(|| BackendSpecificError { - description: - "`capture` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; - (cb, pl) + let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { + description: "clock_gettime failed".to_owned(), + })?; + let capture = cb + .checked_sub(frames_to_duration(frames, self.format.rate())) + .unwrap_or(crate::StreamInstant::ZERO); + (cb, capture) } }; let timestamp = crate::InputStreamTimestamp { callback, capture }; @@ -246,18 +246,14 @@ where self.last_quantum.store(frames as u64, Ordering::Relaxed); let (callback, playback) = match pw_stream_time(stream) { Some(PwTime { now_ns, delay_ns }) => ( - StreamInstant::from_nanos(now_ns), - StreamInstant::from_nanos(now_ns + delay_ns), + StreamInstant::from_nanos(now_ns as u64), + StreamInstant::from_nanos((now_ns + delay_ns.max(0)) as u64), ), None => { - let cb = stream_timestamp_fallback(self.created_instance)?; - let pl = cb - .add(frames_to_duration(frames, self.format.rate())) - .ok_or_else(|| BackendSpecificError { - description: - "`playback` occurs beyond representation supported by `StreamInstant`" - .to_string(), - })?; + let cb = monotonic_stream_instant().ok_or_else(|| BackendSpecificError { + description: "clock_gettime failed".to_owned(), + })?; + let pl = cb + frames_to_duration(frames, self.format.rate()); (cb, pl) } }; @@ -274,18 +270,22 @@ pub struct StreamData { pub context: ContextRc, } -// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. -// -// This ensures positive values that are compatible with our `StreamInstant` representation. -#[inline] -fn stream_timestamp_fallback( - creation: std::time::Instant, -) -> Result { - let now = std::time::Instant::now(); - let duration = now.duration_since(creation); - StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { - description: "stream duration has exceeded `StreamInstant` representation".to_string(), - }) +/// Read `clock_gettime` and return it as a [`StreamInstant`]. +/// +/// This is the same clock used by `pw_stream_get_time_n` (`pw_time.now`), so values +/// returned here are directly comparable with the `callback`/`capture`/`playback` +/// instants delivered to the data callback. +fn monotonic_stream_instant() -> Option { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + if rc == 0 { + Some(StreamInstant::new(ts.tv_sec as u64, ts.tv_nsec as u32)) + } else { + None + } } // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. @@ -319,7 +319,6 @@ where error_callback, sample_format, format: Default::default(), - created_instance: Instant::now(), last_quantum, }; let channels = config.channels as _; @@ -471,7 +470,6 @@ where error_callback, sample_format, format: Default::default(), - created_instance: Instant::now(), last_quantum, }; diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 0d63b3c20..f55c21968 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -18,17 +18,17 @@ use crate::{ const LATENCY_POLL_INTERVAL: Duration = Duration::from_millis(5); pub enum Stream { - Playback(pulseaudio::PlaybackStream), - Record(pulseaudio::RecordStream), + Playback(pulseaudio::PlaybackStream, Instant), + Record(pulseaudio::RecordStream, Instant), } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { match self { - Stream::Playback(stream) => { + Stream::Playback(stream, _) => { block_on(stream.uncork()).map_err(Into::::into)?; } - Stream::Record(stream) => { + Stream::Record(stream, _) => { block_on(stream.uncork()).map_err(Into::::into)?; block_on(stream.started()).map_err(Into::::into)?; } @@ -39,21 +39,29 @@ impl StreamTrait for Stream { fn pause(&self) -> Result<(), crate::PauseStreamError> { let res = match self { - Stream::Playback(stream) => block_on(stream.cork()), - Stream::Record(stream) => block_on(stream.cork()), + Stream::Playback(stream, _) => block_on(stream.cork()), + Stream::Record(stream, _) => block_on(stream.cork()), }; res.map_err(Into::::into)?; Ok(()) } + fn now(&self) -> crate::StreamInstant { + let start = match self { + Stream::Playback(_, start) | Stream::Record(_, start) => *start, + }; + let elapsed = start.elapsed(); + StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()) + } + fn buffer_size(&self) -> Option { let (spec, bytes) = match self { - Stream::Playback(s) => ( + Stream::Playback(s, _) => ( s.sample_spec(), s.buffer_attr().minimum_request_length as usize, ), - Stream::Record(s) => (s.sample_spec(), s.buffer_attr().fragment_size as usize), + Stream::Record(s, _) => (s.sample_spec(), s.buffer_attr().fragment_size as usize), }; let frame_size = spec.channels as usize * spec.format.bytes_per_sample(); if bytes > 0 { @@ -75,8 +83,7 @@ impl Stream { D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - // Use a monotonic clock relative to stream creation for StreamInstants. - let start = std::time::Instant::now(); + let start = Instant::now(); let current_latency_micros = Arc::new(AtomicU64::new(0)); // Microseconds since stream creation at the time of the last latency poll, used @@ -122,14 +129,8 @@ impl Stream { let playback_time = elapsed + Duration::from_micros(latency); let timestamp = OutputStreamTimestamp { - callback: StreamInstant { - secs: elapsed.as_secs() as i64, - nanos: elapsed.subsec_nanos(), - }, - playback: StreamInstant { - secs: playback_time.as_secs() as i64, - nanos: playback_time.subsec_nanos(), - }, + callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()), + playback: StreamInstant::new(playback_time.as_secs(), playback_time.subsec_nanos()), }; // Preemptively fill the buffer with silence in case the user @@ -202,7 +203,7 @@ impl Stream { std::thread::sleep(LATENCY_POLL_INTERVAL); }); - Ok(Self::Playback(stream)) + Ok(Self::Playback(stream, start)) } pub fn new_record( @@ -234,14 +235,8 @@ impl Stream { .unwrap_or_default(); let timestamp = InputStreamTimestamp { - callback: StreamInstant { - secs: elapsed.as_secs() as i64, - nanos: elapsed.subsec_nanos(), - }, - capture: StreamInstant { - secs: capture_time.as_secs() as i64, - nanos: capture_time.subsec_nanos(), - }, + callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()), + capture: StreamInstant::new(capture_time.as_secs(), capture_time.subsec_nanos()), }; let bps = sample_spec.format.bytes_per_sample(); @@ -286,7 +281,7 @@ impl Stream { std::thread::sleep(LATENCY_POLL_INTERVAL); }); - Ok(Self::Record(stream)) + Ok(Self::Record(stream, start)) } } diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 03d694f05..d0aa8a8f8 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -2,7 +2,7 @@ use super::windows_err_to_cpal_err; use crate::traits::StreamTrait; use crate::{ BackendSpecificError, BufferSize, Data, FrameCount, InputCallbackInfo, OutputCallbackInfo, - PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamError, + PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamError, StreamInstant, }; use std::mem; use std::ptr; @@ -12,6 +12,7 @@ use std::time::Duration; use windows::Win32::Foundation; use windows::Win32::Foundation::WAIT_OBJECT_0; use windows::Win32::Media::Audio; +use windows::Win32::System::Performance; use windows::Win32::System::SystemServices; use windows::Win32::System::Threading; @@ -33,6 +34,9 @@ pub struct Stream { // Callback size in frames. period_frames: FrameCount, + + // QueryPerformanceFrequency result, cached at construction (constant for the system lifetime). + qpc_frequency: u64, } // SAFETY: Windows Event HANDLEs are safe to send between threads - they are designed for @@ -124,6 +128,12 @@ impl Stream { let (tx, rx) = channel(); let period_frames = stream_inner.period_frames; + let mut qpc_frequency: i64 = 0; + unsafe { + Performance::QueryPerformanceFrequency(&mut qpc_frequency) + .expect("QueryPerformanceFrequency failed"); + debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero"); + } let run_context = RunContext { handles: vec![pending_scheduled_event, stream_inner.event], @@ -141,6 +151,7 @@ impl Stream { commands: tx, pending_scheduled_event, period_frames, + qpc_frequency: qpc_frequency as u64, } } @@ -160,6 +171,12 @@ impl Stream { let (tx, rx) = channel(); let period_frames = stream_inner.period_frames; + let mut qpc_frequency: i64 = 0; + unsafe { + Performance::QueryPerformanceFrequency(&mut qpc_frequency) + .expect("QueryPerformanceFrequency failed"); + debug_assert_ne!(qpc_frequency, 0, "QueryPerformanceFrequency returned zero"); + } let run_context = RunContext { handles: vec![pending_scheduled_event, stream_inner.event], @@ -177,6 +194,7 @@ impl Stream { commands: tx, pending_scheduled_event, period_frames, + qpc_frequency: qpc_frequency as u64, } } @@ -215,6 +233,23 @@ impl StreamTrait for Stream { Ok(()) } + fn now(&self) -> StreamInstant { + let mut counter: i64 = 0; + unsafe { + Performance::QueryPerformanceCounter(&mut counter) + .expect("QueryPerformanceCounter failed"); + } + // Convert to 100-nanosecond units first, matching the precision of WASAPI QPCPosition + // values delivered to callbacks. This keeps `now()` on the same 100 ns grid as + // callback/capture/playback instants, avoiding false sub-100 ns deltas. + let units_100ns = counter as u128 * 10_000_000 / self.qpc_frequency as u128; + let nanos = units_100ns * 100; + StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ) + } + fn buffer_size(&self) -> Option { Some(self.period_frames) } @@ -561,7 +596,7 @@ fn frames_to_duration(frames: FrameCount, rate: SampleRate) -> Duration { /// Use the stream's `IAudioClock` to produce the current stream instant. /// /// Uses the QPC position produced via the `GetPosition` method. -fn stream_instant(stream: &StreamInner) -> Result { +fn stream_instant(stream: &StreamInner) -> Result { let mut position: u64 = 0; let mut qpc_position: u64 = 0; unsafe { @@ -570,10 +605,12 @@ fn stream_instant(stream: &StreamInner) -> Result)?; }; - // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = qpc_position as i128 * 100; - let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos) - .expect("performance counter out of range of `StreamInstant` representation"); + // The `qpc_position` is in 100-nanosecond units. + let nanos = qpc_position as u128 * 100; + let instant = StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ); Ok(instant) } @@ -586,10 +623,12 @@ fn input_timestamp( stream: &StreamInner, buffer_qpc_position: u64, ) -> Result { - // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. - let qpc_nanos = buffer_qpc_position as i128 * 100; - let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos) - .expect("performance counter out of range of `StreamInstant` representation"); + // The `qpc_position` is in 100-nanosecond units. + let nanos = buffer_qpc_position as u128 * 100; + let capture = StreamInstant::new( + (nanos / 1_000_000_000) as u64, + (nanos % 1_000_000_000) as u32, + ); let callback = stream_instant(stream)?; Ok(crate::InputStreamTimestamp { capture, callback }) } @@ -609,8 +648,6 @@ fn output_timestamp( // `padding` is the number of frames already queued in the endpoint buffer ahead of the // frames we are about to write. Those frames must drain before ours are heard. let padding = stream.max_frames_in_buffer - frames_available; - let playback = callback - .add(frames_to_duration(padding, sample_rate) + stream.stream_latency) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let playback = callback + (frames_to_duration(padding, sample_rate) + stream.stream_latency); Ok(crate::OutputStreamTimestamp { callback, playback }) } diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index fe8f1a457..745b500dd 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -14,7 +14,7 @@ use crate::{ BackendSpecificError, BufferSize, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, - SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, + SampleFormat, SampleRate, StreamConfig, StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; use std::ops::DerefMut; @@ -353,8 +353,8 @@ impl DeviceTrait for Device { 0.0 } }; - let callback = crate::StreamInstant::from_secs_f64(now); - let playback = crate::StreamInstant::from_secs_f64( + let callback = StreamInstant::from_secs_f64(now); + let playback = StreamInstant::from_secs_f64( time_at_start_of_buffer + total_hw_latency_secs, ); let timestamp = crate::OutputStreamTimestamp { callback, playback }; @@ -496,6 +496,10 @@ impl StreamTrait for Stream { } } + fn now(&self) -> StreamInstant { + StreamInstant::from_secs_f64(self.ctx.current_time()) + } + fn buffer_size(&self) -> Option { Some(self.buffer_size_frames as crate::FrameCount) } diff --git a/src/lib.rs b/src/lib.rs index 71787ccfc..3b1a572bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,14 +188,13 @@ pub use platform::{ SupportedInputConfigs, SupportedOutputConfigs, ALL_HOSTS, }; pub use samples_formats::{FromSample, Sample, SampleFormat, SizedSample, I24, U24}; -use std::convert::TryInto; -use std::time::Duration; pub mod device_description; mod error; mod host; pub mod platform; mod samples_formats; +mod timestamp; pub mod traits; /// Iterator of devices wrapped in a filter to only include certain device types @@ -469,66 +468,10 @@ pub struct Data { sample_format: SampleFormat, } -/// A monotonic time instance associated with a stream, retrieved from either: -/// -/// 1. A timestamp provided to the stream's underlying audio data callback or -/// 2. The same time source used to generate timestamps for a stream's underlying audio data -/// callback. -/// -/// `StreamInstant` represents a duration since an unspecified origin point. The origin -/// is guaranteed to occur at or before the stream starts, and remains consistent for the -/// lifetime of that stream. Different streams may have different origins. -/// -/// ## Host `StreamInstant` Sources -/// -/// | Host | Source | -/// | ---- | ------ | -/// | alsa | `snd_pcm_status_get_htstamp` | -/// | asio | `timeGetTime` | -/// | coreaudio | `mach_absolute_time` | -/// | emscripten | `AudioContext.getOutputTimestamp` | -/// | pulseaudio | `std::time::Instant` | -/// | wasapi | `QueryPerformanceCounter` | -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub struct StreamInstant { - secs: i64, - nanos: u32, -} - -/// A timestamp associated with a call to an input stream's data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct InputStreamTimestamp { - /// The instant the stream's data callback was invoked. - pub callback: StreamInstant, - /// The instant that data was captured from the device. - /// - /// E.g. The instant data was read from an ADC. - pub capture: StreamInstant, -} - -/// A timestamp associated with a call to an output stream's data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct OutputStreamTimestamp { - /// The instant the stream's data callback was invoked. - pub callback: StreamInstant, - /// The predicted instant that data written will be delivered to the device for playback. - /// - /// E.g. The instant data will be played by a DAC. - pub playback: StreamInstant, -} - -/// Information relevant to a single call to the user's input stream data callback. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct InputCallbackInfo { - timestamp: InputStreamTimestamp, -} - -/// Information relevant to a single call to the user's output stream data callback. -#[cfg_attr(target_os = "emscripten", wasm_bindgen)] -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct OutputCallbackInfo { - timestamp: OutputStreamTimestamp, -} +pub use timestamp::{ + InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, + StreamInstant, +}; impl SupportedStreamConfig { pub fn new( @@ -570,99 +513,6 @@ impl SupportedStreamConfig { } } -impl StreamInstant { - /// The amount of time elapsed from another instant to this one. - /// - /// Returns `None` if `earlier` is later than self. - pub fn duration_since(&self, earlier: &Self) -> Option { - if self < earlier { - None - } else { - (self.as_nanos() - earlier.as_nanos()) - .try_into() - .ok() - .map(Duration::from_nanos) - } - } - - /// Returns the instant in time after the given duration has passed. - /// - /// Returns `None` if the resulting instant would exceed the bounds of the underlying data - /// structure. - pub fn add(&self, duration: Duration) -> Option { - self.as_nanos() - .checked_add(duration.as_nanos() as i128) - .and_then(Self::from_nanos_i128) - } - - /// Returns the instant in time one `duration` ago. - /// - /// Returns `None` if the resulting instant would underflow. As a result, it is important to - /// consider that on some platforms the [`StreamInstant`] may begin at `0` from the moment the - /// source stream is created. - pub fn sub(&self, duration: Duration) -> Option { - self.as_nanos() - .checked_sub(duration.as_nanos() as i128) - .and_then(Self::from_nanos_i128) - } - - fn as_nanos(&self) -> i128 { - (self.secs as i128 * 1_000_000_000) + self.nanos as i128 - } - - #[allow(dead_code)] - fn from_nanos(nanos: i64) -> Self { - let secs = nanos / 1_000_000_000; - let subsec_nanos = nanos - secs * 1_000_000_000; - Self::new(secs, subsec_nanos as u32) - } - - #[allow(dead_code)] - fn from_nanos_i128(nanos: i128) -> Option { - let secs = nanos / 1_000_000_000; - if secs > i64::MAX as i128 || secs < i64::MIN as i128 { - None - } else { - let subsec_nanos = nanos - secs * 1_000_000_000; - debug_assert!(subsec_nanos < u32::MAX as i128); - Some(Self::new(secs as i64, subsec_nanos as u32)) - } - } - - #[allow(dead_code)] - fn from_secs_f64(secs: f64) -> crate::StreamInstant { - let s = secs.floor() as i64; - let ns = ((secs - s as f64) * 1_000_000_000.0) as u32; - Self::new(s, ns) - } - - pub fn new(secs: i64, nanos: u32) -> Self { - StreamInstant { secs, nanos } - } -} - -impl InputCallbackInfo { - pub fn new(timestamp: InputStreamTimestamp) -> Self { - Self { timestamp } - } - - /// The timestamp associated with the call to an input stream's data callback. - pub fn timestamp(&self) -> InputStreamTimestamp { - self.timestamp - } -} - -impl OutputCallbackInfo { - pub fn new(timestamp: OutputStreamTimestamp) -> Self { - Self { timestamp } - } - - /// The timestamp associated with the call to an output stream's data callback. - pub fn timestamp(&self) -> OutputStreamTimestamp { - self.timestamp - } -} - // Note: Data does not implement `is_empty()` because it always contains a valid audio buffer // by design. The buffer may contain silence, but it is never structurally empty. #[allow(clippy::len_without_is_empty)] @@ -1005,37 +855,3 @@ pub(crate) const COMMON_SAMPLE_RATES: &[SampleRate] = &[ 5512, 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000, 64000, 88200, 96000, 176400, 192000, 352800, 384000, 705600, 768000, 1411200, 1536000, ]; - -#[test] -fn test_stream_instant() { - let a = StreamInstant::new(2, 0); - let b = StreamInstant::new(-2, 0); - let min = StreamInstant::new(i64::MIN, 0); - let max = StreamInstant::new(i64::MAX, 0); - assert_eq!( - a.sub(Duration::from_secs(1)), - Some(StreamInstant::new(1, 0)) - ); - assert_eq!( - a.sub(Duration::from_secs(2)), - Some(StreamInstant::new(0, 0)) - ); - assert_eq!( - a.sub(Duration::from_secs(3)), - Some(StreamInstant::new(-1, 0)) - ); - assert_eq!(min.sub(Duration::from_secs(1)), None); - assert_eq!( - b.add(Duration::from_secs(1)), - Some(StreamInstant::new(-1, 0)) - ); - assert_eq!( - b.add(Duration::from_secs(2)), - Some(StreamInstant::new(0, 0)) - ); - assert_eq!( - b.add(Duration::from_secs(3)), - Some(StreamInstant::new(1, 0)) - ); - assert_eq!(max.add(Duration::from_secs(1)), None); -} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 116d1b1d0..0a2d953e4 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -614,6 +614,17 @@ macro_rules! impl_platform_host { )* } } + + fn now(&self) -> crate::StreamInstant { + match self.0 { + $( + $(#[cfg($feat)])? + StreamInner::$HostVariant(ref s) => { + s.now() + } + )* + } + } } impl From for Device { diff --git a/src/timestamp.rs b/src/timestamp.rs new file mode 100644 index 000000000..783ad1829 --- /dev/null +++ b/src/timestamp.rs @@ -0,0 +1,358 @@ +use std::time::Duration; + +#[cfg(target_os = "emscripten")] +use wasm_bindgen::prelude::*; + +/// A monotonic time instance associated with a stream, retrieved from either: +/// +/// 1. A timestamp provided to the stream's underlying audio data callback or +/// 2. The same time source used to generate timestamps for a stream's underlying audio data +/// callback. +/// +/// `StreamInstant` represents a moment on a stream's monotonic clock. Because the underlying clock +/// is monotonic, `StreamInstant` values are always positive and increasing. +/// +/// Within a single stream, all instants share the same clock, so arithmetic between them is +/// meaningful. Across different streams, origins are not guaranteed to be shared. On some hosts +/// each stream starts its own independent clock at zero, so subtracting a timestamp from one +/// stream and one from another may produce a meaningless result. +/// +/// ## Time sources by host +/// +/// | Host | Time source | +/// | ---- | ----------- | +/// | AAudio | `AAudioStream_getTimestamp(CLOCK_MONOTONIC)` | +/// | ALSA | `snd_pcm_status_get_htstamp()` | +/// | ASIO | `timeGetTime()` | +/// | AudioWorklet | `AudioContext.currentTime` | +/// | CoreAudio | `mach_absolute_time()` | +/// | Emscripten | `AudioContext.currentTime` | +/// | JACK | `jack_get_time()` | +/// | PipeWire | `pw_stream_get_time_n()` | +/// | PulseAudio | `std::time::Instant` | +/// | WASAPI | `QueryPerformanceCounter()` | +/// | WebAudio | `AudioContext.currentTime` | +/// +/// > **Disclaimer:** These system calls might change over time. +/// +/// > **Note:** The `+` and `-` operators on `StreamInstant` may panic if the result cannot be +/// > represented as a `StreamInstant`. Use [`checked_add`][StreamInstant::checked_add] or +/// > [`checked_sub`][StreamInstant::checked_sub] for non-panicking variants. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub struct StreamInstant { + secs: u64, + nanos: u32, +} + +/// A timestamp associated with a call to an input stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct InputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The instant that data was captured from the device. + /// + /// E.g. The instant data was read from an ADC. + pub capture: StreamInstant, +} + +/// A timestamp associated with a call to an output stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The predicted instant that data written will be delivered to the device for playback. + /// + /// E.g. The instant data will be played by a DAC. + pub playback: StreamInstant, +} + +/// Information relevant to a single call to the user's input stream data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct InputCallbackInfo { + pub(crate) timestamp: InputStreamTimestamp, +} + +/// Information relevant to a single call to the user's output stream data callback. +#[cfg_attr(target_os = "emscripten", wasm_bindgen)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputCallbackInfo { + pub(crate) timestamp: OutputStreamTimestamp, +} + +impl StreamInstant { + /// A `StreamInstant` with `secs` and `nanos` both set to zero. + pub const ZERO: Self = Self { secs: 0, nanos: 0 }; + + /// Returns the amount of time elapsed from `earlier` to `self`, or `None` if `earlier` is + /// later than `self`. + pub fn checked_duration_since(&self, earlier: StreamInstant) -> Option { + if self < &earlier { + return None; + } + let delta = self.as_nanos() - earlier.as_nanos(); + let secs = u64::try_from(delta / 1_000_000_000).ok()?; + let subsec_nanos = (delta % 1_000_000_000) as u32; + Some(Duration::new(secs, subsec_nanos)) + } + + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn saturating_duration_since(&self, earlier: StreamInstant) -> Duration { + self.checked_duration_since(earlier).unwrap_or_default() + } + + /// Returns the amount of time elapsed from `earlier` to `self`, saturating to + /// [`Duration::ZERO`] if `earlier` is later than `self`. + pub fn duration_since(&self, earlier: StreamInstant) -> Duration { + self.saturating_duration_since(earlier) + } + + /// Returns `Some(t)` where `t` is `self + duration`, or `None` if the result cannot be + /// represented as a `StreamInstant`. + pub fn checked_add(&self, duration: Duration) -> Option { + let total = self.as_nanos().checked_add(duration.as_nanos())?; + let secs = u64::try_from(total / 1_000_000_000).ok()?; + let nanos = (total % 1_000_000_000) as u32; + Some(StreamInstant { secs, nanos }) + } + + /// Returns `Some(t)` where `t` is `self - duration`, or `None` if the result cannot be + /// represented as a `StreamInstant` (i.e. would be negative). + pub fn checked_sub(&self, duration: Duration) -> Option { + let total = self.as_nanos().checked_sub(duration.as_nanos())?; + let secs = u64::try_from(total / 1_000_000_000).ok()?; + let nanos = (total % 1_000_000_000) as u32; + Some(StreamInstant { secs, nanos }) + } + + /// Returns the total number of nanoseconds contained by this `StreamInstant`. + pub fn as_nanos(&self) -> u128 { + self.secs as u128 * 1_000_000_000 + self.nanos as u128 + } + + /// Creates a new `StreamInstant` from the specified number of nanoseconds. + /// + /// Note: Using this on the return value of `as_nanos()` might cause unexpected behavior: + /// `as_nanos()` returns a `u128`, and can return values that do not fit in `u64`, e.g. 585 + /// years. Instead, consider using the pattern + /// `StreamInstant::new(t.as_secs(), t.subsec_nanos())` if you cannot copy/clone the + /// `StreamInstant` directly. + pub fn from_nanos(nanos: u64) -> Self { + let secs = nanos / 1_000_000_000; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + Self::new(secs, subsec_nanos) + } + + /// Creates a new `StreamInstant` from the specified number of milliseconds. + pub fn from_millis(millis: u64) -> Self { + Self::new(millis / 1_000, (millis % 1_000 * 1_000_000) as u32) + } + + /// Creates a new `StreamInstant` from the specified number of microseconds. + pub fn from_micros(micros: u64) -> Self { + Self::new(micros / 1_000_000, (micros % 1_000_000 * 1_000) as u32) + } + + /// Creates a new `StreamInstant` from the specified number of seconds represented as `f64`. + /// + /// # Panics + /// + /// Panics if `secs` is negative, not finite, or overflows the range of `StreamInstant`. + pub fn from_secs_f64(secs: f64) -> Self { + const NANOS_PER_SEC: u128 = 1_000_000_000; + const MAX_NANOS: f64 = ((u64::MAX as u128 + 1) * NANOS_PER_SEC) as f64; + let nanos = secs * NANOS_PER_SEC as f64; + if !(0.0..MAX_NANOS).contains(&nanos) { + panic!("StreamInstant::from_secs_f64 called with invalid value: {secs}"); + } + let nanos = nanos as u128; + Self::new( + (nanos / NANOS_PER_SEC) as u64, + (nanos % NANOS_PER_SEC) as u32, + ) + } + + /// Creates a new `StreamInstant` from the specified number of whole seconds and additional + /// nanoseconds. + /// + /// If `nanos` is greater than or equal to 1 billion (the number of nanoseconds in a second), + /// the excess carries over into `secs`. + /// + /// # Panics + /// + /// Panics if the carry from `nanos` overflows the seconds counter. + pub fn new(secs: u64, nanos: u32) -> Self { + let carry = nanos / 1_000_000_000; + let subsec_nanos = nanos % 1_000_000_000; + let secs = secs + .checked_add(carry as u64) + .expect("overflow in StreamInstant::new"); + StreamInstant { + secs, + nanos: subsec_nanos, + } + } +} + +impl std::ops::Add for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result overflows the range of `StreamInstant`. Use + /// [`checked_add`][StreamInstant::checked_add] for a non-panicking variant. + #[inline] + fn add(self, rhs: Duration) -> StreamInstant { + self.checked_add(rhs) + .expect("overflow when adding duration to stream instant") + } +} + +impl std::ops::AddAssign for StreamInstant { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl std::ops::Sub for StreamInstant { + type Output = StreamInstant; + + /// # Panics + /// + /// Panics if the result underflows the range of `StreamInstant`. Use + /// [`checked_sub`][StreamInstant::checked_sub] for a non-panicking variant. + #[inline] + fn sub(self, rhs: Duration) -> StreamInstant { + self.checked_sub(rhs) + .expect("overflow when subtracting duration from stream instant") + } +} + +impl std::ops::SubAssign for StreamInstant { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl std::ops::Sub for StreamInstant { + type Output = Duration; + + /// Returns the duration from `rhs` to `self`, saturating to [`Duration::ZERO`] if `rhs` is + /// later than `self`. + #[inline] + fn sub(self, rhs: StreamInstant) -> Duration { + self.saturating_duration_since(rhs) + } +} + +impl InputCallbackInfo { + pub fn new(timestamp: InputStreamTimestamp) -> Self { + Self { timestamp } + } + + /// The timestamp associated with the call to an input stream's data callback. + pub fn timestamp(&self) -> InputStreamTimestamp { + self.timestamp + } +} + +impl OutputCallbackInfo { + pub fn new(timestamp: OutputStreamTimestamp) -> Self { + Self { timestamp } + } + + /// The timestamp associated with the call to an output stream's data callback. + pub fn timestamp(&self) -> OutputStreamTimestamp { + self.timestamp + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stream_instant() { + let z = StreamInstant::ZERO; // origin + let a = StreamInstant::new(2, 0); + let max = StreamInstant::new(u64::MAX, 999_999_999); // largest representable instant + + assert_eq!( + a.checked_sub(Duration::from_secs(1)), + Some(StreamInstant::new(1, 0)) + ); + assert_eq!( + a.checked_sub(Duration::from_secs(2)), + Some(StreamInstant::ZERO) + ); + assert_eq!(a.checked_sub(Duration::from_secs(3)), None); // would go below zero + assert_eq!(z.checked_sub(Duration::from_nanos(1)), None); // underflow at origin + + assert_eq!( + a.checked_add(Duration::from_secs(1)), + Some(StreamInstant::new(3, 0)) + ); + assert_eq!(max.checked_add(Duration::from_nanos(1)), None); // overflow + + assert_eq!(a.duration_since(z), Duration::from_secs(2)); + assert_eq!(z.duration_since(a), Duration::ZERO); // saturates + assert_eq!(a.checked_duration_since(z), Some(Duration::from_secs(2))); + assert_eq!(z.checked_duration_since(a), None); + assert_eq!(a.saturating_duration_since(z), Duration::from_secs(2)); + assert_eq!(z.saturating_duration_since(a), Duration::ZERO); + + assert_eq!(z + Duration::from_secs(2), a); + assert_eq!(a - Duration::from_secs(2), z); + assert_eq!(a - z, Duration::from_secs(2)); + assert_eq!(z - a, Duration::ZERO); // saturates via Sub + let mut c = z; + c += Duration::from_secs(2); + assert_eq!(c, a); + let mut d = a; + d -= Duration::from_secs(2); + assert_eq!(d, z); + + // nanosecond carry + assert_eq!( + StreamInstant::new(1, 1_500_000_000), + StreamInstant::new(2, 500_000_000) + ); + assert_eq!( + StreamInstant::new(0, 1_000_000_000), + StreamInstant::new(1, 0) + ); + + // basic round-trip + assert_eq!( + StreamInstant::from_secs_f64(1.5), + StreamInstant::new(1, 500_000_000) + ); + assert_eq!(StreamInstant::from_secs_f64(0.0), z); + } + + #[test] + #[should_panic] + fn test_stream_instant_new_overflow() { + StreamInstant::new(u64::MAX, 1_000_000_000); // carry overflows u64 + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_negative() { + StreamInstant::from_secs_f64(-1.0); + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_nan() { + StreamInstant::from_secs_f64(f64::NAN); + } + + #[test] + #[should_panic] + fn test_stream_instant_from_secs_f64_infinite() { + StreamInstant::from_secs_f64(f64::INFINITY); + } +} diff --git a/src/traits.rs b/src/traits.rs index 0b0ee0e8c..9314270df 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -11,7 +11,8 @@ use crate::{ BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, PauseStreamError, PlayStreamError, SampleFormat, SizedSample, StreamConfig, - StreamError, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, + StreamError, StreamInstant, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, }; /// A [`Host`] provides access to the available audio devices on the system. @@ -321,6 +322,17 @@ pub trait StreamTrait { fn buffer_size(&self) -> Option { None } + + /// Returns a [`StreamInstant`] representing the current moment on the stream's clock. + /// + /// The clock is **monotonic**: successive calls to `now()` will never return a value earlier + /// than a previous one, and the returned value will never be earlier than any `callback`, + /// `capture`, or `playback` instant already delivered to the stream's data callback. + /// + /// The returned value shares the same time base as the [`StreamInstant`]s delivered to the + /// stream's data callback via [`crate::InputStreamTimestamp::callback`] and + /// [`crate::OutputStreamTimestamp::callback`], so durations between them are meaningful. + fn now(&self) -> StreamInstant; } /// Compile-time assertion that a stream type implements [`Send`].