-
Notifications
You must be signed in to change notification settings - Fork 142
Implement compression for RPC messages with backward compatibility #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement compression for RPC messages with backward compatibility #867
Conversation
📝 WalkthroughWalkthroughThis PR introduces RPC payload compression support to LiveKit using Zstd compression. It threads a client protocol version through the participant creation chain, adds compression utilities based on payload size thresholds, conditionally compresses/decompresses RPC messages based on destination protocol support, and adds a raw binary frame sending capability to the signal stream. A query parameter is appended to signal client initialization to indicate compression support. Changes
Sequence DiagramssequenceDiagram
participant LocalApp as Local Participant
participant Compression as Compression Layer
participant Stream as Signal Stream
participant RTCSession as RTC Session
participant Decompression as Decompression Layer
participant RemoteApp as Remote Handler
LocalApp->>Compression: publish_rpc_request(payload)
alt destination supports compression
Compression->>Compression: compress_rpc_payload_bytes()
Compression-->>LocalApp: compressed_payload
else
Compression-->>LocalApp: payload
end
LocalApp->>Stream: send_raw(binary message)
Stream->>RTCSession: deliver message
RTCSession->>Decompression: process RpcRequest/Response
alt has compressed_payload
Decompression->>Decompression: decompress_rpc_payload_bytes()
alt decompression succeeds
Decompression-->>RTCSession: payload (decompressed)
else
Decompression-->>RTCSession: payload (fallback)
end
else
Decompression-->>RTCSession: payload (regular)
end
RTCSession->>RemoteApp: emit RPC message
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@livekit/src/room/participant/local_participant.rs`:
- Around line 37-39: CI failed because formatting in the import list of
livekit::room::participant::local_participant.rs doesn't match rustfmt; run
`cargo fmt` (or your IDE's Rust formatter) to reformat the crate so `cargo fmt
-- --check` passes, then commit the updated formatting for the module containing
the import list that references compress_rpc_payload_bytes, RpcError,
RpcErrorCode, RpcInvocationData, and MAX_PAYLOAD_BYTES.
In `@livekit/src/room/participant/rpc.rs`:
- Around line 185-201: decompress_rpc_payload_bytes currently uses
zstd::decode_all which can allocate an unbounded buffer and be exploited to
cause OOM; modify decompress_rpc_payload_bytes to use a streaming zstd decoder
(e.g., zstd::stream::read::Decoder) or a similar bounded read and read into a
buffer with an enforced cap (use MAX_PAYLOAD_BYTES or a defined reasonable
limit) so that if the decompressed data would exceed the cap you return an Err;
after reading up to the cap, convert the bytes to UTF-8 and return the String or
an appropriate error if UTF-8 conversion fails or the size limit was exceeded.
In `@livekit/src/rtc_engine/rtc_session.rs`:
- Around line 1201-1209: When decompress_rpc_payload_bytes returns Err(e) for a
proto::rpc_response::Value::CompressedPayload branch, do not return (None,
None); instead construct and return an RpcError (or the module's RpcResponse
error variant) so callers can distinguish decompression failure from an empty
successful response; modify the Err(e) arm in rtc_session.rs to log the error
and synthesize an RpcError/RpcResponse error (including the decompress error
message and context) and return (None, Some(rpc_error)) where RpcError is the
same type used elsewhere for RPC failures.
🧹 Nitpick comments (2)
livekit/src/room/participant/local_participant.rs (2)
833-840: Consider usinglog::warn!instead oflog::error!for expected operational conditions.Many of these error paths represent expected operational scenarios (timeouts, disconnections, payload too large, remote-side RPC errors). Logging all of them at
errorlevel will be noisy in production and dilute the signal of actual errors. Reservelog::error!for truly unexpected failures (e.g., the handler panic at line 1059) and uselog::warn!for the rest.Also applies to: 900-907, 921-928, 938-944, 949-957
1083-1098: Response payload size check does not account for compression.The payload size check at line 1084 (
response_payload.len() <= MAX_PAYLOAD_BYTES) is performed on the uncompressed payload, butpublish_rpc_responsemay subsequently compress it. If the intent is to limit wire size, this check should happen after compression. If the intent is to limit logical payload size regardless of compression, this is fine but worth a comment to clarify intent.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
livekit-api/src/signal_client/mod.rslivekit-api/src/signal_client/signal_stream.rslivekit/Cargo.tomllivekit/src/room/mod.rslivekit/src/room/participant/local_participant.rslivekit/src/room/participant/mod.rslivekit/src/room/participant/remote_participant.rslivekit/src/room/participant/rpc.rslivekit/src/rtc_engine/rtc_session.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit-api/src/signal_client/signal_stream.rs
🧬 Code graph analysis (5)
livekit/src/room/participant/mod.rs (2)
livekit/src/room/participant/local_participant.rs (1)
client_protocol(777-779)livekit/src/room/participant/remote_participant.rs (1)
client_protocol(523-525)
livekit-api/src/signal_client/signal_stream.rs (1)
livekit-api/src/signal_client/mod.rs (2)
send(200-202)send(338-351)
livekit/src/room/participant/remote_participant.rs (1)
livekit/src/room/participant/local_participant.rs (1)
client_protocol(777-779)
livekit/src/rtc_engine/rtc_session.rs (1)
livekit/src/room/participant/rpc.rs (1)
decompress_rpc_payload_bytes(185-201)
livekit/src/room/mod.rs (2)
livekit/src/room/participant/local_participant.rs (1)
client_protocol(777-779)livekit/src/room/participant/remote_participant.rs (1)
client_protocol(523-525)
🪛 GitHub Actions: Rust Formatting
livekit/src/room/participant/mod.rs
[error] 1-1: cargo fmt -- --check detected formatting differences in one or more Rust source files. Run 'cargo fmt' to fix code style issues.
livekit/src/room/participant/rpc.rs
[error] 1-1: cargo fmt -- --check detected formatting differences in one or more Rust source files. Run 'cargo fmt' to fix code style issues.
livekit/src/room/mod.rs
[error] 1-1: cargo fmt -- --check detected formatting differences in one or more Rust source files. Run 'cargo fmt' to fix code style issues.
livekit/src/room/participant/local_participant.rs
[error] 1-1: cargo fmt -- --check detected formatting differences in one or more Rust source files. Run 'cargo fmt' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (x86_64-linux-android)
🔇 Additional comments (13)
livekit-api/src/signal_client/mod.rs (1)
487-489: LGTM — clean addition following the existing pattern.The
client_protocol=1parameter is appended correctly and the inline comment clarifies intent. One minor suggestion: consider defining a constant (similar toPROTOCOL_VERSIONon line 50) if additional protocol capability flags are anticipated, but it's fine as-is for a single use.livekit/Cargo.toml (1)
47-47: LGTM — zstd dependency addition looks appropriate.The
zstdcrate at version0.13is suitable for the compression needs described in this PR.livekit-api/src/signal_client/signal_stream.rs (1)
66-69: LGTM —RawBytesvariant andsend_rawfollow the existingSignal/sendpattern consistently.The new
RawBytesmessage variant, thesend_rawpublic method, and its handler inwrite_taskall mirror the existingSignalpath cleanly.Also applies to: 344-351, 371-378
livekit/src/room/participant/mod.rs (1)
94-94: LGTM —client_protocolis consistently threaded through the participant model.The field is added to
ParticipantInfo, exposed viaenum_dispatch, passed throughnew_inner, and updated inupdate_info. All consistent.Also applies to: 136-137, 186-186, 203-203, 253-255
livekit/src/rtc_engine/rtc_session.rs (1)
1176-1187: Request decompression fallback looks good.Falling back to the uncompressed
rpc_request.payloadon decompression failure is a reasonable degradation strategy.livekit/src/room/participant/remote_participant.rs (1)
88-101: LGTM —client_protocolparameter and accessor mirror theLocalParticipantpattern.Also applies to: 523-525
livekit/src/room/participant/rpc.rs (1)
149-180: Compression utility looks correct and well-guarded.Threshold check, size comparison after compression, and graceful fallback on error are all solid.
livekit/src/room/mod.rs (2)
513-513: LGTM —client_protocolis consistently threaded fromParticipantInfothrough all participant creation paths.The join flow, participant update flow, and
create_participantall correctly propagatepi.client_protocol.Also applies to: 667-667, 1031-1031, 1639-1652
1788-1788: Visibility change topub(crate)is appropriate.This enables other modules within the crate (e.g.,
local_participant) to look up remote participants for compression support checks.livekit/src/room/participant/local_participant.rs (4)
623-660: LGTM!The compression logic for RPC requests is clean — it correctly falls back to the uncompressed payload when compression isn't beneficial or the destination doesn't support it.
662-707: LGTM!Response compression logic mirrors the request path and correctly uses the
Valueenum variants.
991-1007: LGTM!Clean restructuring of the response handling into a
Resultbefore sending through the channel.
610-621: Theclient_protocol >= 1threshold is correct and intentional. The codebase documentation explicitly states thatclient_protocol=1indicates support for RPC compression, and the signal client always sendsclient_protocol=1when connecting. Clients withclient_protocol=0do not support compression, so the check correctly identifies compression-capable participants. No change needed.Likely an incorrect or invalid review comment.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| room::participant::rpc::{ | ||
| compress_rpc_payload_bytes, RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is failing due to formatting issues.
The pipeline reports cargo fmt -- --check detected formatting differences. Run cargo fmt to fix.
🤖 Prompt for AI Agents
In `@livekit/src/room/participant/local_participant.rs` around lines 37 - 39, CI
failed because formatting in the import list of
livekit::room::participant::local_participant.rs doesn't match rustfmt; run
`cargo fmt` (or your IDE's Rust formatter) to reformat the crate so `cargo fmt
-- --check` passes, then commit the updated formatting for the module containing
the import list that references compress_rpc_payload_bytes, RpcError,
RpcErrorCode, RpcInvocationData, and MAX_PAYLOAD_BYTES.
| pub fn decompress_rpc_payload_bytes(compressed: &[u8]) -> Result<String, String> { | ||
| use std::io::Cursor; | ||
|
|
||
| match zstd::decode_all(Cursor::new(compressed)) { | ||
| Ok(decompressed) => match String::from_utf8(decompressed) { | ||
| Ok(s) => { | ||
| Ok(s) | ||
| } | ||
| Err(e) => { | ||
| Err(format!("Failed to decode decompressed RPC payload as UTF-8: {}", e)) | ||
| } | ||
| }, | ||
| Err(e) => { | ||
| Err(format!("Failed to decompress RPC payload: {}", e)) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No decompressed size limit — potential for memory exhaustion from a malicious payload.
zstd::decode_all will allocate unbounded memory for the decompressed output. A crafted compressed payload (zip-bomb style) could decompress to gigabytes, causing OOM. Consider capping the decompressed size to MAX_PAYLOAD_BYTES or a reasonable multiple:
Proposed fix using a bounded read
pub fn decompress_rpc_payload_bytes(compressed: &[u8]) -> Result<String, String> {
- use std::io::Cursor;
+ use std::io::{Cursor, Read};
+
+ // Cap decompressed size to prevent zip-bomb style attacks
+ const MAX_DECOMPRESSED_SIZE: usize = MAX_PAYLOAD_BYTES + 1024; // small margin over max payload
- match zstd::decode_all(Cursor::new(compressed)) {
- Ok(decompressed) => match String::from_utf8(decompressed) {
+ let decoder = zstd::Decoder::new(Cursor::new(compressed))
+ .map_err(|e| format!("Failed to create zstd decoder: {}", e))?;
+ let mut limited = decoder.take(MAX_DECOMPRESSED_SIZE as u64 + 1);
+ let mut buf = Vec::new();
+ match limited.read_to_end(&mut buf) {
+ Ok(n) if n > MAX_DECOMPRESSED_SIZE => {
+ Err(format!("Decompressed RPC payload exceeds max size of {} bytes", MAX_DECOMPRESSED_SIZE))
+ }
+ Ok(_) => match String::from_utf8(buf) {
Ok(s) => Ok(s),
Err(e) => Err(format!("Failed to decode decompressed RPC payload as UTF-8: {}", e)),
},
Err(e) => Err(format!("Failed to decompress RPC payload: {}", e)),
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn decompress_rpc_payload_bytes(compressed: &[u8]) -> Result<String, String> { | |
| use std::io::Cursor; | |
| match zstd::decode_all(Cursor::new(compressed)) { | |
| Ok(decompressed) => match String::from_utf8(decompressed) { | |
| Ok(s) => { | |
| Ok(s) | |
| } | |
| Err(e) => { | |
| Err(format!("Failed to decode decompressed RPC payload as UTF-8: {}", e)) | |
| } | |
| }, | |
| Err(e) => { | |
| Err(format!("Failed to decompress RPC payload: {}", e)) | |
| } | |
| } | |
| } | |
| pub fn decompress_rpc_payload_bytes(compressed: &[u8]) -> Result<String, String> { | |
| use std::io::{Cursor, Read}; | |
| // Cap decompressed size to prevent zip-bomb style attacks | |
| const MAX_DECOMPRESSED_SIZE: usize = MAX_PAYLOAD_BYTES + 1024; // small margin over max payload | |
| let decoder = zstd::Decoder::new(Cursor::new(compressed)) | |
| .map_err(|e| format!("Failed to create zstd decoder: {}", e))?; | |
| let mut limited = decoder.take(MAX_DECOMPRESSED_SIZE as u64 + 1); | |
| let mut buf = Vec::new(); | |
| match limited.read_to_end(&mut buf) { | |
| Ok(n) if n > MAX_DECOMPRESSED_SIZE => { | |
| Err(format!("Decompressed RPC payload exceeds max size of {} bytes", MAX_DECOMPRESSED_SIZE)) | |
| } | |
| Ok(_) => match String::from_utf8(buf) { | |
| Ok(s) => Ok(s), | |
| Err(e) => Err(format!("Failed to decode decompressed RPC payload as UTF-8: {}", e)), | |
| }, | |
| Err(e) => Err(format!("Failed to decompress RPC payload: {}", e)), | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@livekit/src/room/participant/rpc.rs` around lines 185 - 201,
decompress_rpc_payload_bytes currently uses zstd::decode_all which can allocate
an unbounded buffer and be exploited to cause OOM; modify
decompress_rpc_payload_bytes to use a streaming zstd decoder (e.g.,
zstd::stream::read::Decoder) or a similar bounded read and read into a buffer
with an enforced cap (use MAX_PAYLOAD_BYTES or a defined reasonable limit) so
that if the decompressed data would exceed the cap you return an Err; after
reading up to the cap, convert the bytes to UTF-8 and return the String or an
appropriate error if UTF-8 conversion fails or the size limit was exceeded.
| Some(proto::rpc_response::Value::CompressedPayload(compressed)) => { | ||
| match decompress_rpc_payload_bytes(&compressed) { | ||
| Ok(decompressed) => (Some(decompressed), None), | ||
| Err(e) => { | ||
| log::error!("Failed to decompress RPC response payload: {}", e); | ||
| (None, None) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decompression failure silently yields an empty response — consider surfacing an error.
When CompressedPayload decompression fails, (None, None) is emitted, meaning the caller receives an RpcResponse with no payload and no error. This is ambiguous — the caller can't distinguish "empty successful response" from "decompression failed." Consider synthesizing an RpcError so the caller knows something went wrong:
Proposed fix
Some(proto::rpc_response::Value::CompressedPayload(compressed)) => {
match decompress_rpc_payload_bytes(&compressed) {
Ok(decompressed) => (Some(decompressed), None),
Err(e) => {
log::error!("Failed to decompress RPC response payload: {}", e);
- (None, None)
+ (None, Some(proto::RpcError {
+ code: 1500, // ApplicationError
+ message: format!("Failed to decompress response: {}", e),
+ data: String::new(),
+ }))
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Some(proto::rpc_response::Value::CompressedPayload(compressed)) => { | |
| match decompress_rpc_payload_bytes(&compressed) { | |
| Ok(decompressed) => (Some(decompressed), None), | |
| Err(e) => { | |
| log::error!("Failed to decompress RPC response payload: {}", e); | |
| (None, None) | |
| } | |
| } | |
| } | |
| Some(proto::rpc_response::Value::CompressedPayload(compressed)) => { | |
| match decompress_rpc_payload_bytes(&compressed) { | |
| Ok(decompressed) => (Some(decompressed), None), | |
| Err(e) => { | |
| log::error!("Failed to decompress RPC response payload: {}", e); | |
| (None, Some(proto::RpcError { | |
| code: 1500, // ApplicationError | |
| message: format!("Failed to decompress response: {}", e), | |
| data: String::new(), | |
| })) | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@livekit/src/rtc_engine/rtc_session.rs` around lines 1201 - 1209, When
decompress_rpc_payload_bytes returns Err(e) for a
proto::rpc_response::Value::CompressedPayload branch, do not return (None,
None); instead construct and return an RpcError (or the module's RpcResponse
error variant) so callers can distinguish decompression failure from an empty
successful response; modify the Err(e) arm in rtc_session.rs to log the error
and synthesize an RpcError/RpcResponse error (including the decompress error
message and context) and return (None, Some(rpc_error)) where RpcError is the
same type used elsewhere for RPC failures.
ladvoc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ✅, and thank you for removing some of the unnecessary clones!
| Err(_) => { | ||
| log::error!( | ||
| "RPC error code {}: Connection timeout waiting for ACK (timeout: {:?}), request_id: {}, method: {}, destination: {}", | ||
| RpcErrorCode::ConnectionTimeout as u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment: RpcErrorCode::message allows getting a description of the error. Not sure if that's preferable here, but just wanted to point it out.
| /// Compress an RPC payload to raw bytes using Zstd. | ||
| /// Returns Some(compressed_bytes) if compression is beneficial, None otherwise. | ||
| /// This is used with the new `compressed_payload` proto field (no base64 overhead). | ||
| pub fn compress_rpc_payload_bytes(payload: &str) -> Option<Vec<u8>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment: It might be a good idea to have simple unit tests for these.
This PR does:
1 Use the client_protocol in the participantInfo to tell if the remote participant supports compression
2. if compression is supported, compress the RPC msgs that is >= 1KB using zstd
3, if client_protocol is true, the RPC response will also be compressed
From my tests:
The compression ratio is 3.2:1 to 3.5:1 (~4.5KB compressed from 15KB)
in staging endpoint, I got
┌───────────────────────┬──────────────┬────────────┐
│ Metric │ Uncompressed │ Compressed │
├───────────────────────┼──────────────┼────────────┤
│ Payload per direction │ 15 KB │ ~4.6 KB │
├───────────────────────┼──────────────┼────────────┤
│ Round-trip data │ 30 KB │ ~9.2 KB │
├───────────────────────┼──────────────┼────────────┤
│ Avg latency │ 550 ms │ 372 ms │
└───────────────────────┴──────────────┴────────────┘
Uncompressed: 550ms = Fixed + (30KB × transfer_rate)
Compressed: 372ms = Fixed + (9KB × transfer_rate)
Difference: 178ms = 21KB × transfer_rate
→ transfer_rate = 8.5 ms/KB
→ 30KB transfer = 255ms
→ Fixed overhead = 550 - 255 = ~295ms
Summary by CodeRabbit
New Features
Improvements