From 14e0ded7a5e67c4ba66010827338c6258aa10489 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 9 Feb 2026 17:53:26 -0800 Subject: [PATCH 1/2] use a real LLM response and save it to data_rpc_test_data.txt, use it for the rpc stress tests --- data/rpc_test_data.txt | 347 ++++++++++++++++ src/tests/stress/test_rpc_stress.cpp | 581 +++++++++++++++++++++++---- 2 files changed, 849 insertions(+), 79 deletions(-) create mode 100644 data/rpc_test_data.txt diff --git a/data/rpc_test_data.txt b/data/rpc_test_data.txt new file mode 100644 index 0000000..f083a54 --- /dev/null +++ b/data/rpc_test_data.txt @@ -0,0 +1,347 @@ +Here are the main issues / foot-guns I see in that snippet, grouped by severity. + +1) Input validation & type safety problems + +Blindly trusting $body[...] shapes. +participant_attributes, participant_metadata, room_config are assumed to be the correct types. If a client sends "participant_attributes": "lol" you’ll pass a string into setAttributes() and may get a runtime error or (worse) unexpected serialization. + +Fix: explicitly validate types: + +participant_identity, participant_name, room_name → strings, non-empty, length capped + +participant_metadata → string (or JSON string, depending on SDK expectation) + +participant_attributes → associative array of strings + +room_config → array / specific schema expected by SDK + +!empty() is the wrong check for some fields. +empty() treats "0", 0, false, [] as empty. If someone intentionally sets metadata to "0" you’ll skip it. + +Fix: use array_key_exists() / isset() + type checks instead. + +No bounds on identity/name/metadata sizes. +A client can send megabytes of metadata/attributes and you’ll happily embed it into a JWT → big CPU + big response + possible gateway/proxy issues. + +Fix: enforce max lengths (identity/name/metadata) and max attribute count/size. + +2) Security / abuse concerns + +Unauthenticated token minting endpoint (likely). +If this is exposed publicly without auth/rate limiting, anyone can mint tokens and join any room name they choose (including “admin-ish” room names), and they can set arbitrary identity/name/metadata/attributes. + +Fix: require auth (session cookie, API key, JWT from your app, etc.) + rate limit + allowlist/validate room names and identities. + +Identity spoofing. +Because identity comes from the request body, a malicious client can claim to be another user (participant_identity: "alice"). + +Fix: identity/name should come from your authenticated user context, not from client input. + +Room name injection / namespace collisions. +Letting clients pick arbitrary room_name can cause collisions or unauthorized access patterns. + +Fix: server decides the room or validates it against what the authenticated user is allowed to join. + +3) Error handling & operational problems + +Missing checks for env vars. +If LIVEKIT_API_KEY, LIVEKIT_API_SECRET, or LIVEKIT_URL are missing, you’ll mint invalid tokens or return bad data without a clear error. + +Fix: validate envs and return 500 with a clear message (don’t leak secrets). + +No try/catch around SDK calls. +->toJwt() and some setters can throw. As-is, you may return HTML/500 with no JSON body. + +Fix: wrap token generation in try { ... } catch (\Throwable $e) { ... }. + +No response headers. +You’re returning JSON but not setting Content-Type: application/json. + +Fix: header('Content-Type: application/json'); (and ideally charset). + +json_decode without checking for non-object JSON. +If the request body is valid JSON but not an object (e.g. [] or "hi"), $body['room_name'] will emit warnings because $body isn’t an array. + +Fix: after decode, ensure is_array($body). + +4) Grants / permissions clarity + +Grant is “join room” only, but still potentially too broad. +setRoomJoin() allows joining the room; depending on defaults, users can publish/subscribe. That may be fine, but many apps want finer control. + +Fix: set explicit publish/subscribe/record/admin permissions if your SDK supports it and your use case needs it. + +RoomConfig trust boundary. +If room_config can influence server-side room behavior, accepting it from clients is risky unless you strictly validate/allowlist. + +Fix: server-owned config only, or validate against a safe subset. + +5) Minor correctness / style nits + +Port 3000 comment is meaningless in PHP. +PHP typically runs behind a web server or built-in server; the code can’t “make sure it’s on port 3000.” This is likely copy/paste from a Node quickstart. + +Potential JWT bloat. +Attributes/metadata/config all contribute to token size. Large JWTs can exceed header limits if later used in headers, or simply become unwieldy. + +A “safer skeleton” checklist (no big rewrite) + +If you want quick improvements without changing the structure: + +Set headers: Content-Type: application/json + +Ensure $body is an array: if (!is_array($body)) ... + +Validate env vars are present + +Validate/normalize inputs (string lengths, allowed characters, attribute schema) + +Don’t take identity/name from the client; derive from auth + +Wrap minting in try/catch and return JSON errors + +Add auth + rate limiting + +If you tell me what LiveKit PHP SDK version you’re using and what types it expects for setMetadata, setAttributes, and setRoomConfig, I can point out the exact type mismatches and suggest concrete validation rules. + + +Yes — an intentional reconnect (especially if it wasn’t fully handled / “reconnected” wasn’t applied cleanly) can explain exactly that pattern: B is publishing audio, C hears it, A doesn’t; A still sees B’s video; everyone else is fine. + +Here are the most common mechanisms that produce that “A can’t hear B, but everything else works” symptom, and how they relate to reconnect / missing handling. + +1) A’s receiver-side subscription for B’s audio got dropped or stuck + +After reconnect/resume, the SDK often has to re-sync: + +which tracks A is subscribed to, + +which track IDs/SIDs are current, + +and the receiver pipeline for each track. + +If the reconnect path misses “re-apply subscriptions” (or misses the audio subset), you can get: + +B’s video subscribed correctly (so A sees B), + +B’s audio not subscribed / not attached / not resumed (so A hears nothing), + +while C successfully re-subscribed (so C hears B). + +What you’d see in logs (often on A’s side, not B’s): + +track subscribed/unsubscribed events for B audio missing + +“muted”/“enabled=false”/“track not attached” for audio only + +receiver stats: video inbound bytes increasing; audio inbound bytes ~0 + +2) A is receiving B’s audio RTP, but decrypt/MLS state is wrong for that one stream + +If you’re using end-to-end encryption / MLS, a reconnect/desync can produce a selective decrypt failure: + +video might decrypt (different key usage / timing / SSRC mapping / separate sender keys) + +audio might fail decrypt (or fail key lookup) → silence + +other participants still fine (they have correct epoch/keys) + +This matches “C hears B, A doesn’t” because only A is out of sync. + +What you’d see: + +on A: “cannot decrypt frame”, “unknown key”, “epoch mismatch”, “discarding packet” for audio SSRC + +on B: usually nothing (B is just sending) + +on C: normal decrypt / no errors + +3) Track identity changed across reconnect and A is still bound to the old audio track + +An intentional reconnect can result in: + +B’s audio track being republished (new track SID / new transceiver / new SSRC), + +but A’s app logic or state machine still pointing at the old one. + +Result: + +UI shows B present + video (new video track handled) + +audio element for B is still bound to the old track (or never attached) + +C happened to bind to the new track + +Clues: + +two different audio track SIDs for B around the reconnect + +“unpublished old audio track” followed by “published new audio track” + +A never logs “subscribed to new audio track” + +4) Audio receiver exists but is muted/disabled only on A due to state not re-applied + +During reconnect, some SDKs re-create track objects; app code that manages mute/unmute can accidentally: + +keep B’s audio track muted on A, + +while video remains enabled. + +Clues: + +on A: audio track enabled=false, muted=true, or volume set to 0 + +no inbound audio level, but subscription exists + +Why “B-side logs only” often can’t prove this + +Because the failure is usually on A’s receiving path: + +B is publishing and C hears it → B’s publisher is probably fine. + +The divergence is: A’s subscription/decrypt/attach path is broken. + +So B’s logs might show reconnect happened, but to explain “A can’t hear B” you almost always need: + +A-side track events + inbound-rtp stats + (if E2EE) decrypt logs. + +What to check quickly (high signal) + +If you can grab these, it will pinpoint which bucket you’re in: + +On participant A (receiver) + +Did A ever log Subscribed to B’s audio track (track SID)? + +Are inbound RTP audio bytes/packets increasing for B’s audio SSRC? + +Any decrypt/MLS errors tied to audio SSRC or key epoch? + +Is the audio track muted/disabled or not attached to an audio element? + +On participant B (publisher) + +Did B’s audio track get republished (new track SID) around reconnect? + +Any “audio track ended/restarted” events? + +Direct answer to your question + +Intentional reconnect + missing reconnected handling absolutely can explain this behavior, most commonly by leaving A’s audio subscription or receiver pipeline out of sync while video stays fine. + +If MLS/E2EE is in play, it can also explain it via a selective decrypt/key mismatch on A for B’s audio stream. + +If you paste: + +the reconnect section from B’s logs, and + +the same time window from A’s logs (even shorter), +I can tell you which of the above is most likely and what exact log lines to look for next. + +Here’s a small, practical pattern that works well for 100+ rooms with autoSubscribe:false and survives reconnect/resume without “A can’t hear B” type drift. + +Core idea + +Keep a single source of truth for “who should I be subscribed to right now?” + +On any event that can desync state (reconnect/resumed, participant joined/left, tracks published/unpublished, active speaker change), re-apply that desired subscription set. + +You don’t need to persist “current subscription state” perfectly — you can recompute desired state and call subscribe/unsubscribe idempotently. + +Minimal JS/TS snippet +import { Room, RoomEvent, Track } from "livekit-client"; + +const room = new Room({ + autoSubscribe: false, + // ...other options +}); + +// Your policy knobs +const MAX_AUDIO_SUBS = 12; // keep small for 100+ rooms +const pinnedIdentities = new Set(); // optional: user pins + +function desiredAudioPublishers(): string[] { + // 1) Pins always included + const pins = [...pinnedIdentities]; + + // 2) Active speakers next (Room keeps this list updated) + const speakers = room.activeSpeakers + .map(p => p.identity) + .filter(Boolean); + + // Merge, preserve order, cap + const ordered = [...new Set([...pins, ...speakers])]; + return ordered.slice(0, MAX_AUDIO_SUBS); +} + +function applyAudioSubscriptions() { + const wanted = new Set(desiredAudioPublishers()); + + for (const [, p] of room.remoteParticipants) { + // Find this participant’s mic publication (if any) + const micPub = [...p.audioTrackPublications.values()].find(pub => + pub.source === Track.Source.Microphone + ); + if (!micPub) continue; + + const shouldSub = wanted.has(p.identity); + + // Idempotent: calling setSubscribed repeatedly is fine + if (micPub.isSubscribed !== shouldSub) { + micPub.setSubscribed(shouldSub); + } + } +} + +// Re-apply policy on anything that can change reality/state +room + .on(RoomEvent.Connected, applyAudioSubscriptions) + .on(RoomEvent.Reconnected, applyAudioSubscriptions) // WS/RTC recovered + .on(RoomEvent.Resumed, applyAudioSubscriptions) // if using resume + .on(RoomEvent.ParticipantConnected, applyAudioSubscriptions) + .on(RoomEvent.ParticipantDisconnected, applyAudioSubscriptions) + .on(RoomEvent.TrackPublished, applyAudioSubscriptions) + .on(RoomEvent.TrackUnpublished, applyAudioSubscriptions) + .on(RoomEvent.ActiveSpeakersChanged, applyAudioSubscriptions); + +Do you need to track participant list / subscription state? + +You need less than you think: + +Participant list + +No — the room.remoteParticipants map is your participant list. You can iterate it whenever you re-apply. + +Subscription state + +Not really. + +You can compute desired state and call setSubscribed(true/false). + +Checking pub.isSubscribed is only to avoid spamming calls; it’s optional. + +What you do want to track + +Policy inputs you own: + +pinnedIdentities (if you support pins) + +maybe a “stage” list / visible tiles list + +MAX_AUDIO_SUBS + +Everything else can be derived from the room. + +Why this helps your “reconnect caused selective audio loss” case + +If a reconnect/resume accidentally leaves some audio subscriptions “off” (or bound to old track pubs), calling applyAudioSubscriptions() after Reconnected/Resumed/TrackPublished forces the client back to the correct state. + +This is the key: treat reconnect as “my local state might be wrong; re-sync everything.” + +Two small extras that prevent common gotchas + +Handle track SID changes +On reconnect, you can see new publications. Hooking TrackPublished and reapplying covers this. + +Make sure your “identity” is stable +Use participant.identity (string) rather than SIDs that might change between sessions. diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index b941e42..f3ecd5d 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -40,7 +41,7 @@ using namespace std::chrono_literals; constexpr size_t kMaxRpcPayloadSize = 15 * 1024; // Default stress test duration in seconds (can be overridden by env var) -constexpr int kDefaultStressDurationSeconds = 3600; // 1 hour +constexpr int kDefaultStressDurationSeconds = 600; // 10 minutes // Test configuration from environment variables struct RpcStressTestConfig { @@ -167,49 +168,121 @@ class StressTestStats { std::map error_counts_; }; -// Sample sentences for generating compressible payloads -static const std::vector kSampleSentences = { - "The quick brown fox jumps over the lazy dog. ", - "LiveKit is a real-time communication platform for building video and " - "audio applications. ", - "RPC allows participants to call methods on remote peers with " - "request-response semantics. ", - "This stress test measures the performance and reliability of the RPC " - "system under load. ", - "WebRTC enables peer-to-peer communication for real-time media streaming. ", - "The payload is compressed using Zstd to reduce bandwidth and improve " - "throughput. ", - "Data channels provide reliable or unreliable delivery of arbitrary " - "application data. ", - "Participants can publish audio and video tracks to share media with " - "others in the room. ", - "The signaling server coordinates connection establishment between peers. ", - "End-to-end encryption ensures that media content is only accessible to " - "participants. ", -}; +// Path to test data file (relative to repo root) +static const char *kTestDataFile = "data/rpc_test_data.txt"; + +// Loaded test data lines +static std::vector gTestDataLines; +static std::once_flag gTestDataLoadFlag; + +// Load test data from file +void loadTestData() { + std::call_once(gTestDataLoadFlag, []() { + // Try to find the data file relative to different possible working + // directories + std::vector search_paths = { + kTestDataFile, + std::string("../") + kTestDataFile, + std::string("../../") + kTestDataFile, + std::string("../../../") + kTestDataFile, + }; + + std::ifstream file; + for (const auto &path : search_paths) { + file.open(path); + if (file.is_open()) { + std::cout << "Loaded test data from: " << path << std::endl; + break; + } + } -// Generate a payload of specified size using repeating sentences (compressible) + if (!file.is_open()) { + std::cerr << "Warning: Could not find " << kTestDataFile + << ", using fallback test data" << std::endl; + // Fallback to some default lines if file not found + gTestDataLines = { + "This is a fallback test line for RPC stress testing.", + "The test data file could not be found in the expected location.", + "Please ensure data/rpc_test_data.txt exists in the repository.", + }; + return; + } + + std::string line; + while (std::getline(file, line)) { + if (!line.empty()) { + gTestDataLines.push_back(line); + } + } + file.close(); + + std::cout << "Loaded " << gTestDataLines.size() << " lines of test data" + << std::endl; + }); +} + +// Truncate a string at a valid UTF-8 boundary, then pad with spaces to exact +// size +std::string truncateUtf8AndPad(const std::string &str, size_t target_size) { + if (str.size() <= target_size) { + // Pad with spaces to reach target size + std::string result = str; + result.append(target_size - str.size(), ' '); + return result; + } + + // Find the last valid UTF-8 character boundary before target_size + size_t pos = target_size; + while (pos > 0) { + unsigned char c = static_cast(str[pos]); + // UTF-8 continuation bytes start with 10xxxxxx (0x80-0xBF) + // If we're at a continuation byte, move back + if ((c & 0xC0) != 0x80) { + // This is either ASCII or the start of a multi-byte sequence + break; + } + pos--; + } + + std::string result = str.substr(0, pos); + // Pad with spaces to reach exact target size + if (result.size() < target_size) { + result.append(target_size - result.size(), ' '); + } + return result; +} + +// Generate a payload of specified size using random lines from test data +// This produces realistic text that doesn't compress as artificially well as +// repeated sentences std::string generateRandomPayload(size_t size) { + loadTestData(); + static thread_local std::random_device rd; static thread_local std::mt19937 gen(rd()); - static std::uniform_int_distribution dis(0, - kSampleSentences.size() - 1); - std::string result; - result.reserve(size); + if (gTestDataLines.empty()) { + // Should not happen, but return empty string if no data + return std::string(size, 'x'); + } + + std::uniform_int_distribution dis(0, gTestDataLines.size() - 1); - // Start with a random sentence to add some variation between payloads - size_t start_idx = dis(gen); + std::string result; + result.reserve(size + 100); // Extra space for potential truncation + // Build payload from random lines while (result.size() < size) { - // Cycle through sentences starting from a random position - const std::string &sentence = - kSampleSentences[(start_idx + result.size()) % kSampleSentences.size()]; - result += sentence; + size_t line_idx = dis(gen); + const std::string &line = gTestDataLines[line_idx]; + if (!result.empty()) { + result += "\n"; + } + result += line; } - // Trim to exact size - return result.substr(0, size); + // Truncate at valid UTF-8 boundary and pad to exact size + return truncateUtf8AndPad(result, size); } // Wait for a remote participant to appear @@ -260,6 +333,11 @@ TEST_F(RpcStressTest, MaxPayloadStress) { receiver_room->Connect(config_.url, config_.receiver_token, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + auto receiver_info = receiver_room->room_info(); + std::cout << "Receiver connected - Room: " << receiver_info.name + << " (SID: " << receiver_info.sid.value_or("unknown") << ")" + << std::endl; + std::string receiver_identity = receiver_room->localParticipant()->identity(); std::atomic total_received{0}; @@ -270,13 +348,8 @@ TEST_F(RpcStressTest, MaxPayloadStress) { [&total_received]( const RpcInvocationData &data) -> std::optional { total_received++; - // Return checksum of payload for verification - size_t checksum = 0; - for (char c : data.payload) { - checksum += static_cast(c); - } - return std::to_string(data.payload.size()) + ":" + - std::to_string(checksum); + // Echo the payload back for round-trip verification + return data.payload; }); // Create caller room @@ -285,6 +358,11 @@ TEST_F(RpcStressTest, MaxPayloadStress) { caller_room->Connect(config_.url, config_.caller_token, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + auto caller_info = caller_room->room_info(); + std::cout << "Caller connected - Room: " << caller_info.name + << " (SID: " << caller_info.sid.value_or("unknown") << ")" + << std::endl; + bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -322,16 +400,23 @@ TEST_F(RpcStressTest, MaxPayloadStress) { std::chrono::duration(call_end - call_start) .count(); - // Verify response - std::string expected_response = std::to_string(kMaxRpcPayloadSize) + - ":" + - std::to_string(expected_checksum); + // Verify response by comparing checksum (more efficient than full + // comparison) + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } - if (response == expected_response) { + if (response.size() == payload.size() && + response_checksum == expected_checksum) { stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); } else { stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); stats.recordError("checksum_mismatch"); + std::cerr << "[CHECKSUM MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; } } catch (const RpcError &e) { auto call_end = std::chrono::high_resolution_clock::now(); @@ -341,6 +426,11 @@ TEST_F(RpcStressTest, MaxPayloadStress) { stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); auto code = static_cast(e.code()); + std::cerr << "[RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" + << " latency=" << latency_ms << "ms" << std::endl; + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { stats.recordError("timeout"); } else if (code == RpcError::ErrorCode::CONNECTION_TIMEOUT) { @@ -353,6 +443,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { } catch (const std::exception &e) { stats.recordCall(false, 0, kMaxRpcPayloadSize); stats.recordError("exception"); + std::cerr << "[EXCEPTION] " << e.what() << std::endl; } // Small delay between calls to avoid overwhelming @@ -413,6 +504,212 @@ TEST_F(RpcStressTest, MaxPayloadStress) { receiver_room.reset(); } +// Small payload stress test - fits in single SCTP chunk (no fragmentation) +// SCTP MTU is ~1200 bytes, so we use 1000 bytes to leave room for headers +TEST_F(RpcStressTest, SmallPayloadStress) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + // Small payload that fits in single SCTP chunk (no fragmentation overhead) + constexpr size_t kSmallPayloadSize = 1000; + + std::cout << "\n=== RPC Small Payload Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.duration_seconds << " seconds" + << std::endl; + std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; + std::cout << "Payload size: " << kSmallPayloadSize + << " bytes (single SCTP chunk)" << std::endl; + + // Create receiver room + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + auto receiver_info = receiver_room->room_info(); + std::cout << "Receiver connected - Room: " << receiver_info.name + << " (SID: " << receiver_info.sid.value_or("unknown") << ")" + << std::endl; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + + // Register RPC handler that echoes small payloads + receiver_room->localParticipant()->registerRpcMethod( + "small-payload-stress", + [&total_received]( + const RpcInvocationData &data) -> std::optional { + total_received++; + // Echo the payload back for round-trip verification + return data.payload; + }); + + // Create caller room + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + auto caller_info = caller_room->room_info(); + std::cout << "Caller connected - Room: " << caller_info.name + << " (SID: " << caller_info.sid.value_or("unknown") << ")" + << std::endl; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + std::cout << "Both rooms connected. Starting small payload stress test..." + << std::endl; + + StressTestStats stats; + std::atomic running{true}; + std::atomic packet_counter{0}; + + // Helper to get current timestamp in microseconds since epoch + auto get_timestamp_us = []() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + }; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.duration_seconds); + + // Create caller threads + std::vector caller_threads; + for (int t = 0; t < config_.num_caller_threads; ++t) { + caller_threads.emplace_back([&, thread_id = t]() { + while (running.load()) { + int pkt_id = packet_counter.fetch_add(1); + + // Use small payload that fits in single SCTP chunk + std::string payload = generateRandomPayload(kSmallPayloadSize); + + // Calculate expected checksum + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + + auto call_start = std::chrono::high_resolution_clock::now(); + auto send_ts = get_timestamp_us(); + + std::cerr << "[LATENCY] SENDER pkt=" << pkt_id + << " SEND_START ts=" << send_ts << std::endl; + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "small-payload-stress", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + auto recv_ts = get_timestamp_us(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + std::cerr << "[LATENCY] SENDER pkt=" << pkt_id + << " RESPONSE_RECEIVED ts=" << recv_ts + << " latency_ms=" << latency_ms << std::endl; + + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { + stats.recordCall(true, latency_ms, kSmallPayloadSize); + } else { + stats.recordCall(false, latency_ms, kSmallPayloadSize); + stats.recordError("checksum_mismatch"); + } + } catch (const RpcError &e) { + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + stats.recordCall(false, latency_ms, kSmallPayloadSize); + + auto code = static_cast(e.code()); + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + stats.recordError("timeout"); + } else if (code == RpcError::ErrorCode::CONNECTION_TIMEOUT) { + stats.recordError("connection_timeout"); + } else { + stats.recordError("rpc_error_" + std::to_string(e.code())); + } + } catch (const std::exception &e) { + stats.recordCall(false, 0, kSmallPayloadSize); + stats.recordError("exception"); + } + + // Small delay between calls + std::this_thread::sleep_for(10ms); + } + }); + } + + // Progress reporting thread + std::thread progress_thread([&]() { + int last_total = 0; + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + int current_total = stats.totalCalls(); + int calls_per_30s = current_total - last_total; + last_total = current_total; + + std::cout << "[" << elapsed_seconds << "s] Total: " << current_total + << " | Success: " << stats.successfulCalls() + << " | Failed: " << stats.failedCalls() + << " | Rate: " << (calls_per_30s / 30.0) << " calls/sec" + << " | Received: " << total_received.load() << std::endl; + } + }); + + // Wait for test duration + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + std::cout << "\nStopping stress test..." << std::endl; + running.store(false); + + // Wait for all threads + for (auto &t : caller_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + // Verify results + EXPECT_GT(stats.successfulCalls(), 0) << "No successful calls"; + double success_rate = + (stats.totalCalls() > 0) + ? (100.0 * stats.successfulCalls() / stats.totalCalls()) + : 0.0; + EXPECT_GT(success_rate, 95.0) << "Success rate below 95%"; + + receiver_room->localParticipant()->unregisterRpcMethod( + "small-payload-stress"); + caller_room.reset(); + receiver_room.reset(); +} + // Stress test with varying payload sizes TEST_F(RpcStressTest, VaryingPayloadStress) { if (!config_.available) { @@ -439,18 +736,50 @@ TEST_F(RpcStressTest, VaryingPayloadStress) { std::map> received_by_size; std::mutex size_map_mutex; + // Response sizes to use (varying) + // Note: Leave room for metadata prefix "request_size:response_size:checksum:" + // which is about 25 bytes max + constexpr size_t kMetadataOverhead = 30; + std::vector response_sizes = { + 100, // Small (no compression) + 1024, // 1KB (compression threshold) + 5 * 1024, // 5KB + 10 * 1024, // 10KB + kMaxRpcPayloadSize - kMetadataOverhead // Max minus metadata overhead + }; + receiver_room->localParticipant()->registerRpcMethod( "varying-payload-stress", - [&](const RpcInvocationData &data) -> std::optional { + [&, response_sizes]( + const RpcInvocationData &data) -> std::optional { total_received++; - size_t size = data.payload.size(); + size_t request_size = data.payload.size(); { std::lock_guard lock(size_map_mutex); - received_by_size[size]++; + received_by_size[request_size]++; + } + + // Generate a random response payload of varying size + static thread_local std::random_device rd; + static thread_local std::mt19937 gen(rd()); + std::uniform_int_distribution dis(0, response_sizes.size() - 1); + size_t response_size = response_sizes[dis(gen)]; + + std::string response_payload = generateRandomPayload(response_size); + + // Calculate checksum for verification + size_t checksum = 0; + for (char c : response_payload) { + checksum += static_cast(c); } - return std::to_string(size); + // Return format: "request_size:response_size:checksum:payload" + // This allows sender to verify both request was received and response + // is correct + return std::to_string(request_size) + ":" + + std::to_string(response_size) + ":" + std::to_string(checksum) + + ":" + response_payload; }); auto caller_room = std::make_unique(); @@ -497,23 +826,69 @@ TEST_F(RpcStressTest, VaryingPayloadStress) { std::chrono::duration(call_end - call_start) .count(); - if (response == std::to_string(payload_size)) { + // Parse response format: + // "request_size:response_size:checksum:payload" + bool valid = false; + size_t first_colon = response.find(':'); + size_t second_colon = response.find(':', first_colon + 1); + size_t third_colon = response.find(':', second_colon + 1); + + if (first_colon != std::string::npos && + second_colon != std::string::npos && + third_colon != std::string::npos) { + size_t recv_request_size = + std::stoull(response.substr(0, first_colon)); + size_t recv_response_size = std::stoull(response.substr( + first_colon + 1, second_colon - first_colon - 1)); + size_t recv_checksum = std::stoull(response.substr( + second_colon + 1, third_colon - second_colon - 1)); + std::string recv_payload = response.substr(third_colon + 1); + + // Calculate actual checksum of received payload + size_t actual_checksum = 0; + for (char c : recv_payload) { + actual_checksum += static_cast(c); + } + + // Verify all fields + if (recv_request_size == payload_size && + recv_response_size == recv_payload.size() && + recv_checksum == actual_checksum) { + valid = true; + } else { + std::cerr << "[VARYING MISMATCH] sent_size=" << payload_size + << " recv_request_size=" << recv_request_size + << " recv_response_size=" << recv_response_size + << " actual_payload_size=" << recv_payload.size() + << " recv_checksum=" << recv_checksum + << " actual_checksum=" << actual_checksum << std::endl; + } + } else { + std::cerr << "[VARYING PARSE ERROR] response format invalid" + << std::endl; + } + + if (valid) { stats.recordCall(true, latency_ms, payload_size); } else { stats.recordCall(false, latency_ms, payload_size); - stats.recordError("size_mismatch"); + stats.recordError("verification_failed"); } } catch (const RpcError &e) { stats.recordCall(false, 0, payload_size); auto code = static_cast(e.code()); + std::cerr << "[RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { stats.recordError("timeout"); } else { stats.recordError("rpc_error"); } - } catch (const std::exception &) { + } catch (const std::exception &ex) { stats.recordCall(false, 0, payload_size); stats.recordError("exception"); + std::cerr << "[EXCEPTION] " << ex.what() << std::endl; } call_count++; @@ -610,20 +985,14 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::atomic a_received{0}; std::atomic b_received{0}; - // Register handlers on both sides - // Return checksum instead of full payload to avoid exceeding 15KB response - // limit + // Register handlers on both sides - echo payload back for verification room_a->localParticipant()->registerRpcMethod( "ping", [&a_received]( const RpcInvocationData &data) -> std::optional { a_received++; - size_t checksum = 0; - for (char c : data.payload) { - checksum += static_cast(c); - } - return "pong:" + std::to_string(data.payload.size()) + ":" + - std::to_string(checksum); + // Echo the payload back for round-trip verification + return data.payload; }); room_b->localParticipant()->registerRpcMethod( @@ -631,12 +1000,8 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { [&b_received]( const RpcInvocationData &data) -> std::optional { b_received++; - size_t checksum = 0; - for (char c : data.payload) { - checksum += static_cast(c); - } - return "pong:" + std::to_string(data.payload.size()) + ":" + - std::to_string(checksum); + // Echo the payload back for round-trip verification + return data.payload; }); StressTestStats stats_a_to_b; @@ -657,8 +1022,6 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { for (char c : payload) { expected_checksum += static_cast(c); } - std::string expected_response = "pong:" + std::to_string(payload.size()) + - ":" + std::to_string(expected_checksum); auto call_start = std::chrono::high_resolution_clock::now(); @@ -671,13 +1034,30 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::chrono::duration(call_end - call_start) .count(); - if (response == expected_response) { + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { stats_a_to_b.recordCall(true, latency_ms, kMaxRpcPayloadSize); } else { stats_a_to_b.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[A->B MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; } - } catch (...) { + } catch (const RpcError &e) { stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[A->B RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { + stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[A->B EXCEPTION] " << ex.what() << std::endl; } counter++; @@ -696,8 +1076,6 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { for (char c : payload) { expected_checksum += static_cast(c); } - std::string expected_response = "pong:" + std::to_string(payload.size()) + - ":" + std::to_string(expected_checksum); auto call_start = std::chrono::high_resolution_clock::now(); @@ -710,13 +1088,30 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::chrono::duration(call_end - call_start) .count(); - if (response == expected_response) { + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { stats_b_to_a.recordCall(true, latency_ms, kMaxRpcPayloadSize); } else { stats_b_to_a.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[B->A MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; } - } catch (...) { + } catch (const RpcError &e) { stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[B->A RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { + stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[B->A EXCEPTION] " << ex.what() << std::endl; } counter++; @@ -799,7 +1194,8 @@ TEST_F(RpcStressTest, HighThroughputBurst) { [&total_received]( const RpcInvocationData &data) -> std::optional { total_received++; - return std::to_string(data.payload.size()); + // Echo the payload back for round-trip verification + return data.payload; }); auto caller_room = std::make_unique(); @@ -824,6 +1220,12 @@ TEST_F(RpcStressTest, HighThroughputBurst) { while (running.load()) { std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + auto call_start = std::chrono::high_resolution_clock::now(); try { @@ -835,9 +1237,30 @@ TEST_F(RpcStressTest, HighThroughputBurst) { std::chrono::duration(call_end - call_start) .count(); - stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); - } catch (...) { + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { + stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[BURST MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; + } + } catch (const RpcError &e) { + stats.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[BURST RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { stats.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[BURST EXCEPTION] " << ex.what() << std::endl; } // No delay - burst mode From 5a61a2e4e2467a16d34b76aaa57cae1e0fe06edf Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 9 Feb 2026 18:00:09 -0800 Subject: [PATCH 2/2] remove the un-needed logs --- src/tests/stress/test_rpc_stress.cpp | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index f3ecd5d..5ad6a28 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -570,14 +570,6 @@ TEST_F(RpcStressTest, SmallPayloadStress) { StressTestStats stats; std::atomic running{true}; - std::atomic packet_counter{0}; - - // Helper to get current timestamp in microseconds since epoch - auto get_timestamp_us = []() { - return std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - }; auto start_time = std::chrono::steady_clock::now(); auto duration = std::chrono::seconds(config_.duration_seconds); @@ -587,8 +579,6 @@ TEST_F(RpcStressTest, SmallPayloadStress) { for (int t = 0; t < config_.num_caller_threads; ++t) { caller_threads.emplace_back([&, thread_id = t]() { while (running.load()) { - int pkt_id = packet_counter.fetch_add(1); - // Use small payload that fits in single SCTP chunk std::string payload = generateRandomPayload(kSmallPayloadSize); @@ -599,25 +589,16 @@ TEST_F(RpcStressTest, SmallPayloadStress) { } auto call_start = std::chrono::high_resolution_clock::now(); - auto send_ts = get_timestamp_us(); - - std::cerr << "[LATENCY] SENDER pkt=" << pkt_id - << " SEND_START ts=" << send_ts << std::endl; try { std::string response = caller_room->localParticipant()->performRpc( receiver_identity, "small-payload-stress", payload, 60.0); auto call_end = std::chrono::high_resolution_clock::now(); - auto recv_ts = get_timestamp_us(); double latency_ms = std::chrono::duration(call_end - call_start) .count(); - std::cerr << "[LATENCY] SENDER pkt=" << pkt_id - << " RESPONSE_RECEIVED ts=" << recv_ts - << " latency_ms=" << latency_ms << std::endl; - // Verify response by comparing checksum size_t response_checksum = 0; for (char c : response) {