Skip to content

Replace tokio::sync::watch with custom Producer/Subscriber#996

Merged
kixelated merged 9 commits intomainfrom
fetch-boye
Feb 25, 2026
Merged

Replace tokio::sync::watch with custom Producer/Subscriber#996
kixelated merged 9 commits intomainfrom
fetch-boye

Conversation

@kixelated
Copy link
Collaborator

First step at making a better API.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

Walkthrough

APIs across the workspace were made fallible: many producers/consumers (Broadcast/Track/Group/Frame/FrameProducer/etc.) now return Results and use finish/close semantics instead of infallible close. moq-lite internals were refactored to a Producer/Consumer state model with a waiter notification system. New Error variants Dropped and Closed were added. Session::close now takes &mut self. The async-channel dependency was removed from moq-lite and a workspace dependency version was bumped.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main architectural change: replacing tokio::sync::watch with a custom Producer/Subscriber pattern across the codebase.
Description check ✅ Passed The description is vague but related to the changeset, mentioning an API improvement initiative that aligns with the substantial refactoring shown in the file changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fetch-boye

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
rs/moq-lite/src/session.rs (1)

22-38: ⚠️ Potential issue | 🟠 Major

Double-close regression: close(&mut self) no longer prevents Drop from re-closing the session.

Before this change, close(self) consumed the Session, so the Drop impl never fired after an explicit close call. With &mut self, Drop always runs and calls self.session.close(Cancel, "dropped") a second time regardless.

This is observable at the call site in rs/moq-cli/src/client.rs Line 24:

session.close(moq_lite::Error::Cancel);
// session then drops → Drop fires → close() called again with "dropped"

The SessionInner::close signature is fn close(&self, ...), so there's no interior guard preventing the second call. Whether the underlying WebTransport layer handles idempotent close is implementation-defined.

One approach is to track whether close has already been called:

🔒️ Proposed fix using a closed flag
 pub struct Session {
 	session: Box<dyn SessionInner>,
+	closed: bool,
 }

 impl Session {
 	pub(super) fn new<S: web_transport_trait::Session>(session: S) -> Self {
-		Self { session: Box::new(session) }
+		Self { session: Box::new(session), closed: false }
 	}

 	pub fn close(&mut self, err: Error) {
+		if self.closed { return; }
+		self.closed = true;
 		self.session.close(err.to_code(), err.to_string().as_ref());
 	}
 }

 impl Drop for Session {
 	fn drop(&mut self) {
-		self.session.close(Error::Cancel.to_code(), "dropped");
+		self.close(Error::Cancel);
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/session.rs` around lines 22 - 38, The Session::close(&mut
self, err: Error) method must avoid causing Drop to call session.close a second
time; add a closed flag (e.g., a bool or AtomicBool) in the Session/SessionInner
and set it when closing, then early-return from Session::close and from the Drop
impl if already closed. Update Session::close to check-and-set the flag before
calling self.session.close(...) and change the Drop impl to only call
self.session.close(...) when the flag indicates not-yet-closed; keep using
SessionInner::close(...) for the actual close call.
rs/moq-mux/src/import/avc3.rs (1)

77-90: ⚠️ Potential issue | 🟠 Major

Handle catalog rollback if broadcast track creation fails.

Line 87 introduces a fallible step after catalog mutation. If create_track returns an error, the catalog update has already happened and can become stale/inconsistent with broadcast state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/import/avc3.rs` around lines 77 - 90, The code creates a
catalog track (catalog.video.create_track) and then calls
self.broadcast.create_track which can fail, leaving the catalog mutated but
broadcast not updated; modify the flow so if self.broadcast.create_track(track)
returns Err you undo the earlier catalog mutation by removing the created
catalog track (using the created track's identifier/info) before returning the
error. Concretely: after obtaining the created catalog track from
catalog.video.create_track("avc3", config.clone()), keep a handle/clone of that
created track's info/name, call self.broadcast.create_track(...), and on Err
call catalog.video.remove_track(&created_track_info) (while holding or
re-locking the same catalog lock as needed) and then propagate the error;
otherwise proceed to assign self.config and self.track as before.
rs/moq-mux/src/import/av01.rs (1)

99-107: ⚠️ Potential issue | 🟠 Major

Add rollback around fallible create_track calls.

At Lines 104, 147, and 235, catalog state is mutated before broadcast.create_track(...) can fail. If it fails, you can end up with stale catalog tracks (and in reinit paths, the previous track already removed). Please rollback catalog insertion (and avoid dropping old track until new track is confirmed).

Also applies to: 142-150, 231-237

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/import/av01.rs` around lines 99 - 107, You mutate the catalog
by inserting a new track via catalog.video.create_track before calling the
fallible broadcast.create_track, which can leave stale entries if broadcast
creation fails; change the sequence so you 1) do not drop or remove the existing
self.track until broadcast.create_track succeeds, 2) create the new track in a
tentative state or delay committing the catalog change (use a rollback path) —
specifically adjust the code around self.catalog.lock() and
catalog.video.create_track("av01", config.clone()) so that if
self.broadcast.create_track(track) returns an Err you remove the tentative
catalog entry (call the catalog/video removal API for the created track) and
preserve the old self.track, and only set self.config and self.track (and drop
the old track) after broadcast.create_track has succeeded; apply the same change
to the other occurrences at the blocks around lines where
catalog.video.create_track and self.broadcast.create_track are paired (the
blocks that reference catalog, catalog.video.create_track,
self.broadcast.create_track, self.config, and self.track).
rs/moq-mux/src/import/opus.rs (1)

45-61: ⚠️ Potential issue | 🟠 Major

Rollback catalog state on create_track failure.

At Line 60, broadcast.create_track can fail after the catalog track has already been created. Please remove/undo that catalog entry on error so metadata and broadcast state stay consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/import/opus.rs` around lines 45 - 61, The catalog track is
created with catalog.audio.create_track but if
self.broadcast.create_track(track) fails the catalog entry is left behind;
change the code to perform a rollback: keep the created catalog track in a local
variable (e.g., track), attempt
self.broadcast.create_track(track.clone_or_move_as_needed), and on Err call the
catalog removal API (e.g., catalog.audio.remove_track or delete_track using
track.name or track.id) while still holding the catalog lock, then return the
error; ensure you clone the track name/id before moving the track into
broadcast.create_track so you can reference it for removal.
rs/moq-mux/src/import/fmp4.rs (1)

176-197: ⚠️ Potential issue | 🟠 Major

Protect init from partial track-registration failures.

At Line 195, create_track is now fallible after catalog track creation. If this fails, the catalog can keep a track that was never registered in broadcast. Please add rollback on error.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/import/fmp4.rs` around lines 176 - 197, The code currently
creates a catalog track (catalog.video.create_track /
catalog.audio.create_track) and then calls self.broadcast.create_track(track)
but if broadcast.create_track fails the catalog track remains orphaned; fix by
performing rollback: after creating the catalog track (in the vide/soun match
arms) attempt self.broadcast.create_track(track) and only insert into
self.tracks (self.tracks.insert(..., Fmp4Track::new(...))) on success, and if
self.broadcast.create_track returns Err then remove the previously-created
catalog track (call the corresponding removal API on the same catalog object
used to create it, e.g. catalog.video.remove_track(track) or
catalog.audio.remove_track(track)) before returning the error; also handle or
log any error from the removal but propagate the original creation error. Ensure
you reference init_video/init_audio, catalog.video.create_track,
catalog.audio.create_track, self.broadcast.create_track, self.tracks.insert and
Fmp4Track::new when making the changes.
rs/moq-lite/src/ietf/subscriber.rs (1)

526-554: ⚠️ Potential issue | 🟠 Major

Rollback start_publish state when insert_track fails.

Line 553 can now fail after subscribes/publishes are inserted and announce state is started. That leaves stale state on error. Please remove those map entries and undo announce before returning the error.

Suggested rollback pattern
 		// Announce our namespace if we haven't already.
 		// NOTE: This is debated in the IETF draft, but is significantly easier to implement.
 		let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?;
-		broadcast.insert_track(track)?;
+		if let Err(err) = broadcast.insert_track(track) {
+			let mut state = self.state.lock();
+			state.subscribes.remove(&request_id);
+			state.publishes.remove(&request_id);
+			drop(state);
+			let _ = self.stop_announce(msg.track_namespace.to_owned());
+			return Err(err);
+		}
 
 		Ok(())
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 526 - 554, start_publish
currently inserts into state.subscribes and state.publishes and then calls
start_announce(...). If broadcast.insert_track(track) fails you must roll back
those changes and undo the announce before returning Err: on Err from
broadcast.insert_track(track) remove the previously-inserted TrackState from
state.subscribes and remove the request_id entry from state.publishes
(re-acquire the mutex as needed), then undo the announce via the
broadcast/announce API (e.g., remove the track or cancel/stop the broadcast
object returned by start_announce) and finally return the original error; apply
this rollback logic inside start_publish around the broadcast.insert_track call.
🧹 Nitpick comments (7)
rs/moq-clock/src/clock.rs (1)

59-65: LGTM — oversleep detection is correct.

The let next = Utc::now() intentionally shadows the computed next from lines 53–54, grounding now to wall-clock drift rather than the calculated schedule. The minute-boundary break condition is sound.

One minor readability note: renaming the re-fetched value (e.g., actual) would eliminate the shadowing and make the intent clearer at a glance.

✨ Naming suggestion to remove shadowing
-            // Get the current time again to check if we overslept
-            let next = Utc::now();
-            if next.minute() != now.minute() {
+            // Get the current time again to check if we overslept
+            let actual = Utc::now();
+            if actual.minute() != now.minute() {
                 break;
             }
 
-            now = next;
+            now = actual;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-clock/src/clock.rs` around lines 59 - 65, The code currently reuses
the name `next` (shadowing the earlier computed `next` from the scheduling math)
when re-fetching wall-clock time; rename the re-fetched variable to something
like `actual` to remove shadowing and clarify intent: replace `let next =
Utc::now()` with `let actual = Utc::now()` and update the subsequent uses (`if
actual.minute() != now.minute()` and `now = actual`) so the earlier computed
`next` (from the schedule) remains distinct from the wall-clock check.
rs/moq-lite/src/model/state.rs (2)

153-173: Ordering concern in Producer::poll — closed state checked after polling.

In Producer::poll (lines 153–173), the closure f is called first, and the closed check happens only if f returns Pending. This means a producer can observe stale data and Pending in a scenario where the state was just closed. After the closure returns Pending, the closed check at line 164 catches it, so functionality is correct.

However, the inverse ordering is used in poll_modify (line 109: closed checked before calling f). It would be worth documenting why Producer::poll intentionally lets f run even on closed state — likely so final data can still be read — to prevent a future maintainer from "fixing" the ordering.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/state.rs` around lines 153 - 173, Producer::poll calls
the closure f before checking the closed flag (unlike poll_modify which checks
closed first); this is intentional so callers can still observe final data when
the state transitions to closed. Add a short, explicit comment above the
Producer::poll implementation (referencing Producer::poll, poll_modify,
ProducerRef, and closed) that documents this ordering decision and its rationale
(allow final reads even when closed) so future maintainers don't change the
ordering accidentally; keep poll_modify behavior and only add the explanatory
comment near the poll function signature.

237-270: Remove or move commented-out ProducerWeak code.

~60 lines of commented-out code (ProducerWeak) adds noise. If this is planned for near-term use, consider gating it behind a feature flag or tracking it in an issue instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/state.rs` around lines 237 - 270, Remove the large
commented-out ProducerWeak block to reduce noise: delete the commented-out
struct ProducerWeak, its impls (upgrade and Clone) and any commented references
to Producer, Counts, state; if you intend to keep this for near-term use, move
the code behind a feature flag (e.g., a cfg feature) or extract it into a
separate file or issue-tracked patch so the main rs/moq-lite/src/model/state.rs
no longer contains ~60 lines of commented code.
rs/moq-lite/src/model/broadcast.rs (3)

140-148: requests.pop() serves tracks in LIFO order — may cause starvation.

Vec::pop() removes from the end, so if multiple consumers subscribe concurrently, the last request is served first. If the dynamic handler processes one request at a time and new requests keep arriving, earlier requests could be starved.

Consider using VecDeque with pop_front() for FIFO ordering, which matches typical subscription expectations.

Suggested change

In State:

-	requests: Vec<TrackProducer>,
+	requests: std::collections::VecDeque<TrackProducer>,

In poll_requested_track:

-		Poll::Ready(state.requests.pop())
+		Poll::Ready(state.requests.pop_front())

In subscribe_track:

-		state.requests.push(producer.clone());
+		state.requests.push_back(producer.clone());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 140 - 148, The current
implementation uses Vec::pop() in poll_requested_track which serves requests
LIFO and can starve older subscribers; change the State.requests collection from
Vec<...> to std::collections::VecDeque<...>, update any construction/uses (e.g.,
in State and subscribe_track) to push_back new requests, and replace
requests.pop() with requests.pop_front() inside poll_requested_track; also add
the VecDeque import and adjust types/signatures where State.requests is
referenced so FIFO ordering is preserved.

104-107: Unnecessary .clone() on err in close().

err is already owned and not used after the call. The clone allocates needlessly.

Fix
 pub fn close(&mut self, err: Error) -> Result<(), Error> {
-    self.state.close(err.clone())?;
+    self.state.close(err)?;
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 104 - 107, The close
function is cloning the owned err unnecessarily; update close to pass the owned
Error directly to self.state.close by replacing self.state.close(err.clone())
with self.state.close(err) (no clone), ensuring the function signature pub fn
close(&mut self, err: Error) remains the same and that the call to State::close
consumes the Error.

200-244: BroadcastConsumer holds Producer<State>, not Consumer<State> — intentional but may confuse readers.

subscribe_track needs mutable access to push requests and update the producers map, so a Producer<State> handle is required. This means BroadcastConsumer has the same mutation privileges as BroadcastProducer on the shared state. The naming is potentially misleading since "consumer" typically implies read-only access. A brief doc comment on the state field would help future maintainers understand why.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 200 - 244, The field
declaration state: Producer<State> on BroadcastConsumer (and its use in
subscribe_track) grants mutation privileges and can confuse readers expecting a
read-only "consumer"; add a concise doc comment above BroadcastConsumer::state
explaining that a Producer<State> handle is intentionally used to allow
subscribe_track to push requests and update state.producers (same mutation
rights as BroadcastProducer), and note the rationale and safety constraints so
future maintainers understand the design.
rs/moq-lite/src/model/frame.rs (1)

130-137: finish() doesn't mutate state — purely a validation gate.

This is intentional (the doc says "Optional"), but it's a bit surprising that finish() acquires the lock via modify() only to read remaining. Since modify() clones the closed result (checking liveness), this does serve a purpose: it ensures the producer hasn't been aborted. Good as-is, but the method could use borrow() instead to avoid the modify overhead if the modified flag / wake machinery isn't needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/frame.rs` around lines 130 - 137, finish() doesn't
actually mutate state and needn't call state.modify(); change its signature to
take &self (fn finish(&self) -> Result<()>) and replace the state.modify()? call
with a non-mutating read like self.state.borrow()? so you still check remaining
and the closed/liveness without triggering modify/wake semantics; keep the same
Error::WrongSize return when remaining != 0 and ensure you import/handle the
borrow result type as needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/hang/src/catalog/producer.rs`:
- Around line 90-97: The Drop path currently calls
self.catalog.to_string().expect("invalid catalog") which can panic during
unwinding; change it to handle serialization errors non‑panically: call
self.catalog.to_string() and match on the Result, and if Err, log the
serialization error (e.g. via tracing::error or similar) and skip writing the
frame while still finishing/closing the group safely; keep using the existing
append_group(), group.write_frame(), and group.finish() calls but ensure no
expect() remains in the Drop path.

In `@rs/moq-clock/src/clock.rs`:
- Around line 47-51: The loop is silently discarding errors from
segment.write_frame (both the initial write of base and the per-second write of
delta), causing the loop to keep sleeping and retrying after the consumer drops;
update the calls to segment.write_frame (in the loop and for base) to check the
Result and short-circuit on Err — e.g., handle the Err from segment.write_frame
by breaking/returning the error so the per-second loop stops immediately instead
of spinning until finish()?; locate the writes to segment.write_frame and the
surrounding loop/finish()? logic to implement this early exit.

In `@rs/moq-clock/src/main.rs`:
- Around line 99-101: The code currently uses the try operator on
broadcast.subscribe_track(&track) which will exit the app on any subscription
error; instead handle the Result explicitly in the announce/retry loop: call
broadcast.subscribe_track(&track) and match on Ok(track) to create clock =
Some(clock::Subscriber::new(track)), on Err(e) check for the recoverable
NotFound case (using the error type/method that identifies NotFound) and log a
warning/tracing and continue the loop, and for any other error return or
propagate it as a hard failure; remove the `?` and ensure only non-recoverable
errors bubble up.

In `@rs/moq-lite/src/ietf/publisher.rs`:
- Line 123: The call to broadcast.subscribe_track(&track) currently uses the ?
operator which would bubble a subscribe failure into a control-path error;
change it to explicitly match or map the Err case and convert the failure into
the appropriate protocol error (e.g., SubscribeError or
RequestError::SubscribeError) and return that protocol error to the
caller/session instead of propagating with ?, keeping the session alive; locate
the call to broadcast.subscribe_track(&track) and replace the ? usage with
mapping/matching that wraps the underlying error into the protocol-level
SubscribeError/RequestError and returns it as the request response.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 278-283: The cleanup call frame.abort(err.clone()) should not use
? because an abort failure could mask the original error from self.run_frame;
replace the fallible propagation with a non-propagating call (e.g., call .ok()
and ignore the result) so that run_frame's Err is returned unchanged from the
block where group.create_frame and self.run_frame are used (referencing the
run_frame call and frame.abort invocation in this section, mirroring the
existing cleanup pattern used in run_subscribe and recv_group).
- Around line 154-158: The select! block currently treats Ok(None) and Err(_)
the same and silently swallows errors; update the match on
broadcast.requested_track() so that Ok(Some(producer)) yields producer, Ok(None)
still breaks for clean shutdown, and Err(err) logs the error (using your crate's
logger/tracing) before breaking; locate the tokio::select! that assigns to track
from broadcast.requested_track() and add an explicit Err(err) arm that records
the error context and then exits the loop.

In `@rs/moq-lite/src/model/frame.rs`:
- Around line 88-100: poll_read_all can panic when called with index >=
self.chunks.len() (e.g., after read_all sets self.index = usize::MAX); add a
bounds check at the start of poll_read_all (and similarly in poll_read_chunks)
to return Poll::Ready(Bytes::new()) or an empty buffer when index >=
self.chunks.len(), keeping the existing remaining check and the rest of the
logic intact; reference the symbols poll_read_all, poll_read_chunks,
self.chunks, index, remaining, and read_all to locate and apply the guard.
- Around line 114-117: The FrameState initialization currently casts info.size
to usize which can truncate on 32-bit targets; change the constructor (the
function creating FrameState, e.g., new) to validate the file/frame size before
casting by using usize::try_from(info.size) (or similar) and return a
Result<Self, Error> on overflow, assigning the converted value to
FrameState.remaining instead of a direct cast; alternatively, if you choose not
to change the signature, add a debug_assert!(info.size <= usize::MAX as u64)
before the cast to catch issues in development—ensure the check is located where
FrameState { chunks: ..., remaining: ... } is created so write_chunk size
validation remains correct.

In `@rs/moq-lite/src/model/state.rs`:
- Around line 113-127: poll_modify can spin if the closure f uses
ProducerMut::DerefMut (which sets ProducerMut::modified = true) but returns
Poll::Pending, because the ProducerMut drop will wake the waiter registered
afterwards; to fix, after detecting Poll::Pending and before
waiter.register(&mut state.waiters), clear/reset the modified flag on the
re-extracted producer state (i.e. access the ProducerState/ProducerMut modified
field and set it false) so the drop won't trigger an immediate wake, ensuring
waiter registration is stable; alter poll_modify accordingly (symbols:
poll_modify, ProducerMut, ProducerState.state, modified, waiter.register).

In `@rs/moq-lite/src/model/track.rs`:
- Around line 185-193: is_closed currently returns only self.state.borrow().fin
so aborted producers (where state.closed is Err(_)) are not treated as closed;
update Track::is_closed to detect either finish or abort by checking the state's
closed flag as well (e.g. return true if fin is true OR state.closed is
Err/_is_some_), referencing Track::is_closed, the TrackState fields fin and
closed, and the abort/finish paths; if visibility prevents direct access, expose
a method on ProducerRef/TrackState (e.g., is_closed_internal or use
modify().is_err()) to read the closed state and use that in is_closed.

In `@rs/moq-relay/src/web.rs`:
- Line 288: The call to broadcast.subscribe_track currently maps all errors to
StatusCode::NOT_FOUND; change the map_err on broadcast.subscribe_track(&track)
to inspect the error variants instead and return NOT_FOUND only for the "not
found" variant, while mapping Transport, ProtocolViolation, Duplicate (and other
server-side variants) to StatusCode::INTERNAL_SERVER_ERROR — mirror the pattern
used for group fetching (the same match-based error->StatusCode mapping) so
subscribe_track's errors are accurately surfaced.

---

Outside diff comments:
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 526-554: start_publish currently inserts into state.subscribes and
state.publishes and then calls start_announce(...). If
broadcast.insert_track(track) fails you must roll back those changes and undo
the announce before returning Err: on Err from broadcast.insert_track(track)
remove the previously-inserted TrackState from state.subscribes and remove the
request_id entry from state.publishes (re-acquire the mutex as needed), then
undo the announce via the broadcast/announce API (e.g., remove the track or
cancel/stop the broadcast object returned by start_announce) and finally return
the original error; apply this rollback logic inside start_publish around the
broadcast.insert_track call.

In `@rs/moq-lite/src/session.rs`:
- Around line 22-38: The Session::close(&mut self, err: Error) method must avoid
causing Drop to call session.close a second time; add a closed flag (e.g., a
bool or AtomicBool) in the Session/SessionInner and set it when closing, then
early-return from Session::close and from the Drop impl if already closed.
Update Session::close to check-and-set the flag before calling
self.session.close(...) and change the Drop impl to only call
self.session.close(...) when the flag indicates not-yet-closed; keep using
SessionInner::close(...) for the actual close call.

In `@rs/moq-mux/src/import/av01.rs`:
- Around line 99-107: You mutate the catalog by inserting a new track via
catalog.video.create_track before calling the fallible broadcast.create_track,
which can leave stale entries if broadcast creation fails; change the sequence
so you 1) do not drop or remove the existing self.track until
broadcast.create_track succeeds, 2) create the new track in a tentative state or
delay committing the catalog change (use a rollback path) — specifically adjust
the code around self.catalog.lock() and catalog.video.create_track("av01",
config.clone()) so that if self.broadcast.create_track(track) returns an Err you
remove the tentative catalog entry (call the catalog/video removal API for the
created track) and preserve the old self.track, and only set self.config and
self.track (and drop the old track) after broadcast.create_track has succeeded;
apply the same change to the other occurrences at the blocks around lines where
catalog.video.create_track and self.broadcast.create_track are paired (the
blocks that reference catalog, catalog.video.create_track,
self.broadcast.create_track, self.config, and self.track).

In `@rs/moq-mux/src/import/avc3.rs`:
- Around line 77-90: The code creates a catalog track
(catalog.video.create_track) and then calls self.broadcast.create_track which
can fail, leaving the catalog mutated but broadcast not updated; modify the flow
so if self.broadcast.create_track(track) returns Err you undo the earlier
catalog mutation by removing the created catalog track (using the created
track's identifier/info) before returning the error. Concretely: after obtaining
the created catalog track from catalog.video.create_track("avc3",
config.clone()), keep a handle/clone of that created track's info/name, call
self.broadcast.create_track(...), and on Err call
catalog.video.remove_track(&created_track_info) (while holding or re-locking the
same catalog lock as needed) and then propagate the error; otherwise proceed to
assign self.config and self.track as before.

In `@rs/moq-mux/src/import/fmp4.rs`:
- Around line 176-197: The code currently creates a catalog track
(catalog.video.create_track / catalog.audio.create_track) and then calls
self.broadcast.create_track(track) but if broadcast.create_track fails the
catalog track remains orphaned; fix by performing rollback: after creating the
catalog track (in the vide/soun match arms) attempt
self.broadcast.create_track(track) and only insert into self.tracks
(self.tracks.insert(..., Fmp4Track::new(...))) on success, and if
self.broadcast.create_track returns Err then remove the previously-created
catalog track (call the corresponding removal API on the same catalog object
used to create it, e.g. catalog.video.remove_track(track) or
catalog.audio.remove_track(track)) before returning the error; also handle or
log any error from the removal but propagate the original creation error. Ensure
you reference init_video/init_audio, catalog.video.create_track,
catalog.audio.create_track, self.broadcast.create_track, self.tracks.insert and
Fmp4Track::new when making the changes.

In `@rs/moq-mux/src/import/opus.rs`:
- Around line 45-61: The catalog track is created with
catalog.audio.create_track but if self.broadcast.create_track(track) fails the
catalog entry is left behind; change the code to perform a rollback: keep the
created catalog track in a local variable (e.g., track), attempt
self.broadcast.create_track(track.clone_or_move_as_needed), and on Err call the
catalog removal API (e.g., catalog.audio.remove_track or delete_track using
track.name or track.id) while still holding the catalog lock, then return the
error; ensure you clone the track name/id before moving the track into
broadcast.create_track so you can reference it for removal.

---

Nitpick comments:
In `@rs/moq-clock/src/clock.rs`:
- Around line 59-65: The code currently reuses the name `next` (shadowing the
earlier computed `next` from the scheduling math) when re-fetching wall-clock
time; rename the re-fetched variable to something like `actual` to remove
shadowing and clarify intent: replace `let next = Utc::now()` with `let actual =
Utc::now()` and update the subsequent uses (`if actual.minute() != now.minute()`
and `now = actual`) so the earlier computed `next` (from the schedule) remains
distinct from the wall-clock check.

In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 140-148: The current implementation uses Vec::pop() in
poll_requested_track which serves requests LIFO and can starve older
subscribers; change the State.requests collection from Vec<...> to
std::collections::VecDeque<...>, update any construction/uses (e.g., in State
and subscribe_track) to push_back new requests, and replace requests.pop() with
requests.pop_front() inside poll_requested_track; also add the VecDeque import
and adjust types/signatures where State.requests is referenced so FIFO ordering
is preserved.
- Around line 104-107: The close function is cloning the owned err
unnecessarily; update close to pass the owned Error directly to self.state.close
by replacing self.state.close(err.clone()) with self.state.close(err) (no
clone), ensuring the function signature pub fn close(&mut self, err: Error)
remains the same and that the call to State::close consumes the Error.
- Around line 200-244: The field declaration state: Producer<State> on
BroadcastConsumer (and its use in subscribe_track) grants mutation privileges
and can confuse readers expecting a read-only "consumer"; add a concise doc
comment above BroadcastConsumer::state explaining that a Producer<State> handle
is intentionally used to allow subscribe_track to push requests and update
state.producers (same mutation rights as BroadcastProducer), and note the
rationale and safety constraints so future maintainers understand the design.

In `@rs/moq-lite/src/model/frame.rs`:
- Around line 130-137: finish() doesn't actually mutate state and needn't call
state.modify(); change its signature to take &self (fn finish(&self) ->
Result<()>) and replace the state.modify()? call with a non-mutating read like
self.state.borrow()? so you still check remaining and the closed/liveness
without triggering modify/wake semantics; keep the same Error::WrongSize return
when remaining != 0 and ensure you import/handle the borrow result type as
needed.

In `@rs/moq-lite/src/model/state.rs`:
- Around line 153-173: Producer::poll calls the closure f before checking the
closed flag (unlike poll_modify which checks closed first); this is intentional
so callers can still observe final data when the state transitions to closed.
Add a short, explicit comment above the Producer::poll implementation
(referencing Producer::poll, poll_modify, ProducerRef, and closed) that
documents this ordering decision and its rationale (allow final reads even when
closed) so future maintainers don't change the ordering accidentally; keep
poll_modify behavior and only add the explanatory comment near the poll function
signature.
- Around line 237-270: Remove the large commented-out ProducerWeak block to
reduce noise: delete the commented-out struct ProducerWeak, its impls (upgrade
and Clone) and any commented references to Producer, Counts, state; if you
intend to keep this for near-term use, move the code behind a feature flag
(e.g., a cfg feature) or extract it into a separate file or issue-tracked patch
so the main rs/moq-lite/src/model/state.rs no longer contains ~60 lines of
commented code.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f1e289 and a4d3f2c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (37)
  • Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/producer.rs
  • rs/hang/src/container/frame.rs
  • rs/hang/src/container/producer.rs
  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/publish.rs
  • rs/moq-cli/src/client.rs
  • rs/moq-cli/src/publish.rs
  • rs/moq-clock/src/clock.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-lite/Cargo.toml
  • rs/moq-lite/src/error.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/mod.rs
  • rs/moq-lite/src/model/state.rs
  • rs/moq-lite/src/model/track.rs
  • rs/moq-lite/src/model/waiter.rs
  • rs/moq-lite/src/session.rs
  • rs/moq-mux/src/import/aac.rs
  • rs/moq-mux/src/import/av01.rs
  • rs/moq-mux/src/import/avc3.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/hev1.rs
  • rs/moq-mux/src/import/opus.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/src/log.rs
  • rs/moq-native/src/server.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/web.rs
💤 Files with no reviewable changes (1)
  • rs/moq-lite/Cargo.toml

Comment on lines +90 to +97
let Ok(mut group) = self.track.append_group() else {
return;
};

// TODO decide if this should return an error, or be impossible to fail
let frame = self.catalog.to_string().expect("invalid catalog");
group.write_frame(frame);
group.close();
let _ = group.write_frame(frame);
let _ = group.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid panicking in Drop for catalog serialization.

This drop path suppresses transport/state errors (good), but it can still panic via expect("invalid catalog"), which is risky inside Drop and may abort during unwinding.

Proposed non-panicking drop-path handling
 		let Ok(mut group) = self.track.append_group() else {
 			return;
 		};

-		// TODO decide if this should return an error, or be impossible to fail
-		let frame = self.catalog.to_string().expect("invalid catalog");
-		let _ = group.write_frame(frame);
-		let _ = group.finish();
+		let Ok(frame) = self.catalog.to_string() else {
+			tracing::warn!("failed to serialize catalog during drop");
+			return;
+		};
+		if let Err(err) = group.write_frame(frame) {
+			tracing::warn!(%err, "failed to publish catalog frame during drop");
+			return;
+		}
+		if let Err(err) = group.finish() {
+			tracing::warn!(%err, "failed to finish catalog group during drop");
+		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let Ok(mut group) = self.track.append_group() else {
return;
};
// TODO decide if this should return an error, or be impossible to fail
let frame = self.catalog.to_string().expect("invalid catalog");
group.write_frame(frame);
group.close();
let _ = group.write_frame(frame);
let _ = group.finish();
let Ok(mut group) = self.track.append_group() else {
return;
};
let Ok(frame) = self.catalog.to_string() else {
tracing::warn!("failed to serialize catalog during drop");
return;
};
if let Err(err) = group.write_frame(frame) {
tracing::warn!(%err, "failed to publish catalog frame during drop");
return;
}
if let Err(err) = group.finish() {
tracing::warn!(%err, "failed to finish catalog group during drop");
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/catalog/producer.rs` around lines 90 - 97, The Drop path
currently calls self.catalog.to_string().expect("invalid catalog") which can
panic during unwinding; change it to handle serialization errors non‑panically:
call self.catalog.to_string() and match on the Result, and if Err, log the
serialization error (e.g. via tracing::error or similar) and skip writing the
frame while still finishing/closing the group safely; keep using the existing
append_group(), group.write_frame(), and group.finish() calls but ensure no
expect() remains in the Drop path.

Comment on lines 99 to 101
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track);
let track = broadcast.subscribe_track(&track)?;
clock = Some(clock::Subscriber::new(track));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid terminating the subscriber loop on recoverable subscribe_track failures.

At Line 100, ? exits the whole app on the first subscription error. Given this is inside the announce/retry loop, treat NotFound as retryable and continue waiting; only bubble up hard failures.

💡 Suggested adjustment
-							let track = broadcast.subscribe_track(&track)?;
-							clock = Some(clock::Subscriber::new(track));
+							match broadcast.subscribe_track(&track) {
+								Ok(track_consumer) => {
+									clock = Some(clock::Subscriber::new(track_consumer));
+								}
+								Err(moq_lite::Error::NotFound) => {
+									tracing::warn!(broadcast = %path, "track not available yet, waiting for next announce");
+									continue;
+								}
+								Err(err) => return Err(err).context("failed to subscribe to track"),
+							}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track);
let track = broadcast.subscribe_track(&track)?;
clock = Some(clock::Subscriber::new(track));
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
match broadcast.subscribe_track(&track) {
Ok(track_consumer) => {
clock = Some(clock::Subscriber::new(track_consumer));
}
Err(moq_lite::Error::NotFound) => {
tracing::warn!(broadcast = %path, "track not available yet, waiting for next announce");
continue;
}
Err(err) => return Err(err).context("failed to subscribe to track"),
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-clock/src/main.rs` around lines 99 - 101, The code currently uses the
try operator on broadcast.subscribe_track(&track) which will exit the app on any
subscription error; instead handle the Result explicitly in the announce/retry
loop: call broadcast.subscribe_track(&track) and match on Ok(track) to create
clock = Some(clock::Subscriber::new(track)), on Err(e) check for the recoverable
NotFound case (using the error type/method that identifies NotFound) and log a
warning/tracing and continue the loop, and for any other error return or
propagate it as a hard failure; remove the `?` and ensure only non-recoverable
errors bubble up.

@kixelated kixelated enabled auto-merge (squash) February 25, 2026 19:08
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 171-176: When decrementing state.dynamic in the drop path (inside
the self.state.modify() block), clearing state.requests alone leaves requested
tracks alive via state.producers/cleanup-task clones; update the block so that
when state.dynamic reaches zero you also abort/terminate all outstanding
requested tracks held by producers and the cleanup task: iterate the pending
requests (state.requests) and for each associated producer in state.producers
send a terminal error or call the producer/cleanup abort method so subscribers
receive a terminal response rather than hanging. Ensure this change is made next
to the existing state.dynamic/saturating_sub logic and references
state.requests, state.producers, and the cleanup/producer abort method used by
Producer::dynamic().

In `@rs/moq-lite/src/model/track.rs`:
- Around line 148-151: The finish() method should mark the track as closed, not
just set state.fin; update pub fn finish(&mut self) to also set state.closed =
true (in the same mutable state obtained via self.state.modify()?) so
TrackConsumer::closed() doesn't remain blocked after an explicit finish, and
after setting state.closed ensure you trigger the same wake/notify mechanism
used when producer clones are dropped so waiting consumers are unblocked.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4d3f2c and 44aeb68.

📒 Files selected for processing (11)
  • rs/moq-clock/src/clock.rs
  • rs/moq-lite/Cargo.toml
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/moq-lite/src/model/state.rs
  • rs/moq-lite/src/model/track.rs
  • rs/moq-lite/src/session.rs
  • rs/moq-relay/src/web.rs
💤 Files with no reviewable changes (1)
  • rs/moq-lite/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (3)
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-clock/src/clock.rs
  • rs/moq-relay/src/web.rs

Comment on lines +148 to +151
pub fn finish(&mut self) -> Result<()> {
let mut state = self.state.modify()?;
state.fin = true;
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

finish() should transition the track into a closed state.

finish() currently sets fin but leaves state.closed open. That can make TrackConsumer::closed() block after an explicit finish until producer clones are dropped, which is not expected lifecycle behavior.

Proposed fix
 pub fn finish(&mut self) -> Result<()> {
 	let mut state = self.state.modify()?;
 	state.fin = true;
+	state.close(Error::Closed);
 	Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn finish(&mut self) -> Result<()> {
let mut state = self.state.modify()?;
state.fin = true;
Ok(())
pub fn finish(&mut self) -> Result<()> {
let mut state = self.state.modify()?;
state.fin = true;
state.closed = Err(Error::Closed);
Ok(())
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/track.rs` around lines 148 - 151, The finish() method
should mark the track as closed, not just set state.fin; update pub fn
finish(&mut self) to also set state.closed = true (in the same mutable state
obtained via self.state.modify()?) so TrackConsumer::closed() doesn't remain
blocked after an explicit finish, and after setting state.closed ensure you
trigger the same wake/notify mechanism used when producer clones are dropped so
waiting consumers are unblocked.

BroadcastConsumer now holds Consumer<State> instead of Producer<State>,
so dropping all real producers correctly triggers auto-close. The
broadcast's track HashMap stores TrackWeak (non-counting) references
for deduplication, and Consumer::produce() enables temporary upgrades
when mutation is needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
rs/moq-lite/src/model/track.rs (1)

150-154: finish() sets fin but does not close the state — consumers' closed().await won't resolve until the producer is dropped.

This was flagged in a past review suggesting state.close(Error::Closed) be added. The current design relies on the producer being dropped shortly after finish() to trigger the Dropped error that resolves closed().await. This separation is reasonable if the intent is that finish only signals "no more new groups" while the track remains readable, but it means closed().await has no way to distinguish between "finished" and "producer still active."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/track.rs` around lines 150 - 154, The finish() method
currently sets state.fin = true but does not close the shared state, so
consumers awaiting closed().await never resolve until the producer is dropped;
update Track::finish to call state.close(Error::Closed) (or the appropriate
error variant used in the crate) after setting fin so the state transitions to
closed and closed().await resolves; locate the finish method in track.rs and
invoke the existing state.close(...) API (matching the error type used
elsewhere) instead of relying on Drop to produce a Dropped error.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/state.rs (2)

250-263: Weak::produce has a benign TOCTOU between closed-check and count increment.

Between unlocking (Line 255) and fetch_add (Line 257), the last Producer could drop and set closed = Err(Dropped). The returned Producer will fail on every modify() call, which is acceptable. However, documenting this would prevent future confusion.

Suggested doc comment
 pub fn produce(&self) -> Result<Producer<T>, Error> {
+	// NOTE: There's a small TOCTOU window where the state could close
+	// between the check and the count increment. The returned Producer
+	// will simply fail on all modify() calls in that case.
 	{
 		let state = self.state.lock();
 		state.closed.clone()?;
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/state.rs` around lines 250 - 263, Weak::produce
contains a benign TOCTOU between the unlocked check of state.closed and the
subsequent counts.producers.fetch_add; document this by adding a short doc
comment on Weak::produce (or atop the impl) stating that the closed check on
state.closed is performed while holding the lock but the counts increment
happens after unlocking, so a Producer created here may immediately observe
closed = Err(Dropped) and thus have modify() fail — this is intentional and
acceptable behavior, so callers should expect that a returned Producer can still
fail on modify().

499-527: ProducerConsumer fields are pub — consider restricting visibility.

Both producer and consumer are public fields. If callers can independently drop or replace them, the shared state invariants (e.g., consumer count) could be violated. If this is intentional for ergonomics, a doc comment explaining the contract would help.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/state.rs` around lines 499 - 527, The ProducerConsumer
struct currently exposes its producer and consumer fields as pub which allows
callers to drop/replace them and break internal invariants; change the fields to
private (remove pub on producer and consumer) and add controlled accessors such
as methods producer(&self) -> &Producer<T> and consumer(&self) -> &Consumer<T>
or explicit take/replace APIs if mutation is required, update the impls that
reference the fields (ProducerConsumer::new, Default, Clone) to use the new
private fields and accessors, and if you intentionally want public fields
instead document the invariants in a clear doc comment on ProducerConsumer
explaining the allowed usage and lifetime/ownership rules.
rs/moq-lite/src/model/group.rs (1)

108-116: write_frame doesn't call finish() on the created frame — relies on Producer drop ordering.

When the FrameProducer returned by create_frame is dropped at the end of this function, Producer::drop sets closed = Err(Dropped). Frame consumers still receive the data correctly because Consumer::poll checks the closure's Poll::Ready result before checking the closed flag (state.rs Line 156-157). So the data is accessible despite the Dropped error state.

This works but is subtle. Consider calling frame.finish()? for clarity and consistency with the explicit lifecycle pattern used elsewhere (e.g., ietf/subscriber.rs Line 441, lite/subscriber.rs Line 289).

Suggested change
 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) -> Result<()> {
 	let data = frame.into();
 	let frame = Frame {
 		size: data.len() as u64,
 	};
 	let mut frame = self.create_frame(frame)?;
 	frame.write_chunk(data)?;
+	frame.finish()?;
 	Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/group.rs` around lines 108 - 116, write_frame currently
creates a FrameProducer via create_frame and writes chunks but never calls
finish(), relying on Producer::drop to mark completion; explicitly call finish()
on the produced frame (the FrameProducer returned by create_frame) after
write_chunk to properly complete the frame lifecycle and surface any errors
(i.e., replace implicit drop with frame.finish()? where frame is the
FrameProducer) to match patterns used in other modules (see FrameProducer,
create_frame, Producer::drop, and the explicit finish usage in subscriber code).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 287-291: The select arm currently swallows both Ok(None) and
Err(_) with `_ => break`; change the match on broadcast.requested_track() so it
explicitly handles Err(err) and logs the error before breaking while keeping
Ok(None) as the clean shutdown break and Ok(Some(producer) => producer as
before. Locate the tokio::select! using broadcast.requested_track() (the let
track = ... block) and replace `_ => break` with two arms: `Ok(None) => break`
and `Err(err) => { <log the error via the module's logger/tracing (as done in
lite/subscriber.rs)>; break }`, mirroring the error-logging behavior from
rs/moq-lite/src/lite/subscriber.rs.

In `@rs/moq-lite/src/model/state.rs`:
- Around line 136-148: poll_unused currently checks consumers with Relaxed
ordering before acquiring the lock, which allows a TOCTOU where the last
Consumer drops between the check and waiter registration and the newly
registered waiter is never woken; fix this by, after calling
waiter.register(&mut state.waiters) while holding the lock, re-checking the
consumer count (e.g. consumers.load(Ordering::Relaxed) or stronger ordering) and
if it is zero then unregister/remove the waiter and return Poll::Ready(Ok(()))
instead of Poll::Pending; apply the identical change to Weak::poll_unused to
avoid the same race there (use the same register/re-check/unregister pattern
around waiter.register and return Ready when count is zero).

In `@rs/moq-lite/src/model/track.rs`:
- Around line 104-120: create_group currently allows groups with sequence >
max_sequence after finish while append_group rejects all new groups post-finish;
make behavior consistent by having create_group check state.fin up front and
return Err(Error::Closed) (same as append_group/finish behavior) before any
duplicate/sequence logic, update the check in create_group (referencing
create_group, state.fin, max_sequence, Error::Closed) and adjust/add tests or
docs if you instead intend the opposite policy (only accept sequence <=
max_sequence).
- Around line 228-249: TrackWeak::consume currently sets index: 0 which makes
subscribers start at group 0; change it to mirror TrackProducer::consume by
computing the start index from the returned consumer's offset and groups length
(e.g., index = consumer.offset + consumer.groups.len().saturating_sub(1)) so the
weak consumer starts at the latest group. Update the TrackWeak::consume
implementation (use the same consumer variable returned from
self.state.consume() and set TrackConsumer.index accordingly) to match
TrackProducer::consume behavior.

---

Duplicate comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 150-154: The finish() method currently sets state.fin = true but
does not close the shared state, so consumers awaiting closed().await never
resolve until the producer is dropped; update Track::finish to call
state.close(Error::Closed) (or the appropriate error variant used in the crate)
after setting fin so the state transitions to closed and closed().await
resolves; locate the finish method in track.rs and invoke the existing
state.close(...) API (matching the error type used elsewhere) instead of relying
on Drop to produce a Dropped error.

---

Nitpick comments:
In `@rs/moq-lite/src/model/group.rs`:
- Around line 108-116: write_frame currently creates a FrameProducer via
create_frame and writes chunks but never calls finish(), relying on
Producer::drop to mark completion; explicitly call finish() on the produced
frame (the FrameProducer returned by create_frame) after write_chunk to properly
complete the frame lifecycle and surface any errors (i.e., replace implicit drop
with frame.finish()? where frame is the FrameProducer) to match patterns used in
other modules (see FrameProducer, create_frame, Producer::drop, and the explicit
finish usage in subscriber code).

In `@rs/moq-lite/src/model/state.rs`:
- Around line 250-263: Weak::produce contains a benign TOCTOU between the
unlocked check of state.closed and the subsequent counts.producers.fetch_add;
document this by adding a short doc comment on Weak::produce (or atop the impl)
stating that the closed check on state.closed is performed while holding the
lock but the counts increment happens after unlocking, so a Producer created
here may immediately observe closed = Err(Dropped) and thus have modify() fail —
this is intentional and acceptable behavior, so callers should expect that a
returned Producer can still fail on modify().
- Around line 499-527: The ProducerConsumer struct currently exposes its
producer and consumer fields as pub which allows callers to drop/replace them
and break internal invariants; change the fields to private (remove pub on
producer and consumer) and add controlled accessors such as methods
producer(&self) -> &Producer<T> and consumer(&self) -> &Consumer<T> or explicit
take/replace APIs if mutation is required, update the impls that reference the
fields (ProducerConsumer::new, Default, Clone) to use the new private fields and
accessors, and if you intentionally want public fields instead document the
invariants in a clear doc comment on ProducerConsumer explaining the allowed
usage and lifetime/ownership rules.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 44aeb68 and 1ce3051.

📒 Files selected for processing (9)
  • rs/hang/examples/video.rs
  • rs/moq-cli/src/publish.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/state.rs
  • rs/moq-lite/src/model/track.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-cli/src/publish.rs

Comment on lines 104 to 120
pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
let group = info.produce();
self.insert_group(group.consume()).then_some(group)
}

/// Create a new group with the next sequence number.
pub fn append_group(&mut self) -> GroupProducer {
let mut producer = None;
let mut state = self.state.modify()?;
if state.fin && state.max_sequence.unwrap_or(0) >= group.info.sequence {
return Err(Error::Closed);
}

self.state.send_if_modified(|state| {
assert!(state.closed.is_none());
if !state.duplicates.insert(group.info.sequence) {
return Err(Error::Duplicate);
}

let now = tokio::time::Instant::now();
state.trim(now);
state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
state.groups.push_back(group.clone());

let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1);
let group = Group { sequence }.produce();
state.groups.push_back((now, group.consume()));
state.max_sequence = Some(sequence);
Ok(group)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

create_group allows new groups with higher sequences after finish(), but append_group does not.

After finish(), create_group only rejects groups where sequence <= max_sequence (Line 108). Groups with higher sequences are accepted. However, append_group unconditionally rejects after finish() (Line 125-127).

This inconsistency may be intentional (allowing out-of-order "old" groups to arrive from concurrent producers), but there's a data-loss risk: consumers that already received None from poll_next_group (because fin == true) will never see subsequently added groups.

If the intent is to allow late-arriving out-of-order groups, consider documenting this behavior. If only groups with sequence <= max_sequence should be accepted post-finish, the check at Line 108 is correct. If no groups at all should be accepted, the fin check alone suffices.

Also applies to: 123-137

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/track.rs` around lines 104 - 120, create_group
currently allows groups with sequence > max_sequence after finish while
append_group rejects all new groups post-finish; make behavior consistent by
having create_group check state.fin up front and return Err(Error::Closed) (same
as append_group/finish behavior) before any duplicate/sequence logic, update the
check in create_group (referencing create_group, state.fin, max_sequence,
Error::Closed) and adjust/add tests or docs if you instead intend the opposite
policy (only accept sequence <= max_sequence).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-lite/src/ietf/subscriber.rs (1)

530-560: ⚠️ Potential issue | 🟡 Minor

start_publish: insert_track failure leaves stale state entries — need cleanup.

If broadcast.insert_track(&track)? fails (e.g., returns Err(Error::Duplicate) or fails from state.modify()?), the entries inserted into state.subscribes and state.publishes at lines 541–551 remain in the maps. These stale entries are never cleaned up, as cleanup mechanisms (recv_subscribe_error, recv_request_error, recv_publish_done) depend on messages from the remote side that won't arrive after a local failure.

Suggested fix: clean up state on insert_track failure
 	// Announce our namespace if we haven't already.
 	// NOTE: This is debated in the IETF draft, but is significantly easier to implement.
 	let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?;
-	broadcast.insert_track(&track)?;
+	if let Err(err) = broadcast.insert_track(&track) {
+		let mut state = self.state.lock();
+		state.subscribes.remove(&request_id);
+		state.publishes.remove(&request_id);
+		return Err(err);
+	}

 	Ok(())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 530 - 560, In start_publish,
when broadcast.insert_track(&track)? fails it currently leaves entries in
state.subscribes and state.publishes; change start_publish so that after
inserting the TrackState and publishes entry but before returning the error from
broadcast.insert_track, you catch the error, re-lock self.state and remove the
request_id from both state.subscribes and state.publishes (undo the earlier
inserts), then propagate the original error; reference the symbols
start_publish, TrackState, state.subscribes, state.publishes, start_announce,
and broadcast.insert_track to locate where to add the cleanup on failure.
🧹 Nitpick comments (1)
rs/moq-lite/src/model/state.rs (1)

218-237: Weak::produce error-path uses Relaxed for the compensating fetch_sub.

Lines 223 and 228 both use Ordering::Relaxed. The increment at line 223 is safe because the RMW total modification order ensures Producer::drop will see it. However, on the error path (line 228), the compensating fetch_sub also uses Relaxed, which means another concurrent Weak::produce or Producer::drop might briefly observe an inflated count. This is benign in practice (the lock guards actual state transitions), but for symmetry with Producer::drop / Consumer::drop which use AcqRel, consider using AcqRel here too.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/state.rs` around lines 218 - 237, The compensating
decrement in Weak::produce currently uses counts.producers.fetch_sub(...,
Ordering::Relaxed) which can transiently leave an inflated count; change that
specific fetch_sub call to use Ordering::AcqRel to match the acquire-release
semantics used by Producer::drop and Consumer::drop (leave the initial fetch_add
as-is Relaxed), i.e. replace the Ordering::Relaxed argument on the error-path
fetch_sub with Ordering::AcqRel so the decrement has appropriate
synchronization.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 530-560: In start_publish, when broadcast.insert_track(&track)?
fails it currently leaves entries in state.subscribes and state.publishes;
change start_publish so that after inserting the TrackState and publishes entry
but before returning the error from broadcast.insert_track, you catch the error,
re-lock self.state and remove the request_id from both state.subscribes and
state.publishes (undo the earlier inserts), then propagate the original error;
reference the symbols start_publish, TrackState, state.subscribes,
state.publishes, start_announce, and broadcast.insert_track to locate where to
add the cleanup on failure.

---

Nitpick comments:
In `@rs/moq-lite/src/model/state.rs`:
- Around line 218-237: The compensating decrement in Weak::produce currently
uses counts.producers.fetch_sub(..., Ordering::Relaxed) which can transiently
leave an inflated count; change that specific fetch_sub call to use
Ordering::AcqRel to match the acquire-release semantics used by Producer::drop
and Consumer::drop (leave the initial fetch_add as-is Relaxed), i.e. replace the
Ordering::Relaxed argument on the error-path fetch_sub with Ordering::AcqRel so
the decrement has appropriate synchronization.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1ce3051 and 0a1b81d.

📒 Files selected for processing (5)
  • .vscode/settings.json
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/state.rs
  • rs/moq-lite/src/model/track.rs

@kixelated kixelated merged commit a0f55d0 into main Feb 25, 2026
1 check passed
@kixelated kixelated deleted the fetch-boye branch February 25, 2026 23:21
@moq-bot moq-bot bot mentioned this pull request Feb 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant