From ce6a048ea7e1b086577d099f2a90f5c0b22d3835 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Wed, 11 Feb 2026 15:50:37 -0800 Subject: [PATCH 1/3] adding connection and audio latency tests --- src/tests/common/test_common.h | 321 +++++++ src/tests/stress/test_latency_measurement.cpp | 353 ++++++++ src/tests/stress/test_rpc_stress.cpp | 822 +----------------- 3 files changed, 690 insertions(+), 806 deletions(-) create mode 100644 src/tests/common/test_common.h create mode 100644 src/tests/stress/test_latency_measurement.cpp diff --git a/src/tests/common/test_common.h b/src/tests/common/test_common.h new file mode 100644 index 0000000..f168406 --- /dev/null +++ b/src/tests/common/test_common.h @@ -0,0 +1,321 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +using namespace std::chrono_literals; + +// ============================================================================= +// Common Constants +// ============================================================================= + +// Default number of test iterations for connection tests +constexpr int kDefaultTestIterations = 10; + +// Default stress test duration in seconds +constexpr int kDefaultStressDurationSeconds = 600; // 10 minutes + +// ============================================================================= +// Common Test Configuration +// ============================================================================= + +/** + * Common test configuration loaded from environment variables. + * + * Environment variables: + * LIVEKIT_URL - WebSocket URL of the LiveKit server + * LIVEKIT_CALLER_TOKEN - Token for the caller/sender participant + * LIVEKIT_RECEIVER_TOKEN - Token for the receiver participant + * TEST_ITERATIONS - Number of iterations for iterative tests (default: + * 10) STRESS_DURATION_SECONDS - Duration for stress tests in seconds (default: + * 600) STRESS_CALLER_THREADS - Number of caller threads for stress tests + * (default: 4) + */ +struct TestConfig { + std::string url; + std::string caller_token; + std::string receiver_token; + int test_iterations; + int stress_duration_seconds; + int num_caller_threads; + bool available = false; + + static TestConfig fromEnv() { + TestConfig config; + const char *url = std::getenv("LIVEKIT_URL"); + const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *iterations_env = std::getenv("TEST_ITERATIONS"); + const char *duration_env = std::getenv("STRESS_DURATION_SECONDS"); + const char *threads_env = std::getenv("STRESS_CALLER_THREADS"); + + if (url && caller_token && receiver_token) { + config.url = url; + config.caller_token = caller_token; + config.receiver_token = receiver_token; + config.available = true; + } + + config.test_iterations = + iterations_env ? std::atoi(iterations_env) : kDefaultTestIterations; + config.stress_duration_seconds = + duration_env ? std::atoi(duration_env) : kDefaultStressDurationSeconds; + config.num_caller_threads = threads_env ? std::atoi(threads_env) : 4; + + return config; + } +}; + +// ============================================================================= +// Utility Functions +// ============================================================================= + +/// Get current timestamp in microseconds +inline uint64_t getTimestampUs() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +/// Wait for a remote participant to appear in the room +inline bool waitForParticipant(Room *room, const std::string &identity, + std::chrono::milliseconds timeout) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (room->remoteParticipant(identity) != nullptr) { + return true; + } + std::this_thread::sleep_for(100ms); + } + return false; +} + +// ============================================================================= +// Statistics Collection +// ============================================================================= + +/** + * Thread-safe latency statistics collector. + * Records latency measurements and provides summary statistics. + */ +class LatencyStats { +public: + void addMeasurement(double latency_ms) { + std::lock_guard lock(mutex_); + measurements_.push_back(latency_ms); + } + + void printStats(const std::string &title) const { + std::lock_guard lock(mutex_); + + if (measurements_.empty()) { + std::cout << "\n" << title << ": No measurements collected" << std::endl; + return; + } + + std::vector sorted = measurements_; + std::sort(sorted.begin(), sorted.end()); + + double sum = std::accumulate(sorted.begin(), sorted.end(), 0.0); + double avg = sum / sorted.size(); + double min = sorted.front(); + double max = sorted.back(); + double p50 = getPercentile(sorted, 50); + double p95 = getPercentile(sorted, 95); + double p99 = getPercentile(sorted, 99); + + std::cout << "\n========================================" << std::endl; + std::cout << " " << title << std::endl; + std::cout << "========================================" << std::endl; + std::cout << "Samples: " << sorted.size() << std::endl; + std::cout << std::fixed << std::setprecision(2); + std::cout << "Min: " << min << " ms" << std::endl; + std::cout << "Avg: " << avg << " ms" << std::endl; + std::cout << "P50: " << p50 << " ms" << std::endl; + std::cout << "P95: " << p95 << " ms" << std::endl; + std::cout << "P99: " << p99 << " ms" << std::endl; + std::cout << "Max: " << max << " ms" << std::endl; + std::cout << "========================================\n" << std::endl; + } + + size_t count() const { + std::lock_guard lock(mutex_); + return measurements_.size(); + } + + void clear() { + std::lock_guard lock(mutex_); + measurements_.clear(); + } + +private: + static double getPercentile(const std::vector &sorted, + int percentile) { + if (sorted.empty()) + return 0.0; + size_t index = (sorted.size() * percentile) / 100; + if (index >= sorted.size()) + index = sorted.size() - 1; + return sorted[index]; + } + + mutable std::mutex mutex_; + std::vector measurements_; +}; + +/** + * Extended statistics collector for stress tests. + * Tracks success/failure counts, bytes transferred, and error breakdown. + */ +class StressTestStats { +public: + void recordCall(bool success, double latency_ms, size_t payload_size = 0) { + std::lock_guard lock(mutex_); + total_calls_++; + if (success) { + successful_calls_++; + latencies_.push_back(latency_ms); + total_bytes_ += payload_size; + } else { + failed_calls_++; + } + } + + void recordError(const std::string &error_type) { + std::lock_guard lock(mutex_); + error_counts_[error_type]++; + } + + void printStats(const std::string &title = "Stress Test Statistics") const { + std::lock_guard lock(mutex_); + + std::cout << "\n========================================" << std::endl; + std::cout << " " << title << std::endl; + std::cout << "========================================" << std::endl; + std::cout << "Total calls: " << total_calls_ << std::endl; + std::cout << "Successful: " << successful_calls_ << std::endl; + std::cout << "Failed: " << failed_calls_ << std::endl; + std::cout << "Success rate: " << std::fixed << std::setprecision(2) + << (total_calls_ > 0 ? (100.0 * successful_calls_ / total_calls_) + : 0.0) + << "%" << std::endl; + std::cout << "Total bytes: " << total_bytes_ << " (" + << (total_bytes_ / (1024.0 * 1024.0)) << " MB)" << std::endl; + + if (!latencies_.empty()) { + std::vector sorted_latencies = latencies_; + std::sort(sorted_latencies.begin(), sorted_latencies.end()); + + double sum = std::accumulate(sorted_latencies.begin(), + sorted_latencies.end(), 0.0); + double avg = sum / sorted_latencies.size(); + double min = sorted_latencies.front(); + double max = sorted_latencies.back(); + double p50 = sorted_latencies[sorted_latencies.size() * 50 / 100]; + double p95 = sorted_latencies[sorted_latencies.size() * 95 / 100]; + double p99 = sorted_latencies[sorted_latencies.size() * 99 / 100]; + + std::cout << "\nLatency (ms):" << std::endl; + std::cout << " Min: " << min << std::endl; + std::cout << " Avg: " << avg << std::endl; + std::cout << " P50: " << p50 << std::endl; + std::cout << " P95: " << p95 << std::endl; + std::cout << " P99: " << p99 << std::endl; + std::cout << " Max: " << max << std::endl; + } + + if (!error_counts_.empty()) { + std::cout << "\nError breakdown:" << std::endl; + for (const auto &pair : error_counts_) { + std::cout << " " << pair.first << ": " << pair.second << std::endl; + } + } + + std::cout << "========================================\n" << std::endl; + } + + int totalCalls() const { + std::lock_guard lock(mutex_); + return total_calls_; + } + + int successfulCalls() const { + std::lock_guard lock(mutex_); + return successful_calls_; + } + + int failedCalls() const { + std::lock_guard lock(mutex_); + return failed_calls_; + } + +private: + mutable std::mutex mutex_; + int total_calls_ = 0; + int successful_calls_ = 0; + int failed_calls_ = 0; + size_t total_bytes_ = 0; + std::vector latencies_; + std::map error_counts_; +}; + +// ============================================================================= +// Base Test Fixture +// ============================================================================= + +/** + * Base test fixture that handles SDK initialization and configuration loading. + */ +class LiveKitTestBase : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogSink::kConsole); + config_ = TestConfig::fromEnv(); + } + + void TearDown() override { livekit::shutdown(); } + + /// Skip the test if the required environment variables are not set + void skipIfNotConfigured() { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + } + + TestConfig config_; +}; + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp new file mode 100644 index 0000000..2c95859 --- /dev/null +++ b/src/tests/stress/test_latency_measurement.cpp @@ -0,0 +1,353 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../common/test_common.h" +#include +#include + +namespace livekit { +namespace test { + +// Audio configuration for latency test +constexpr int kAudioSampleRate = 48000; +constexpr int kAudioChannels = 1; +constexpr int kAudioFrameDurationMs = 10; +constexpr int kSamplesPerFrame = + kAudioSampleRate * kAudioFrameDurationMs / 1000; + +// Energy threshold for detecting high-energy frames +constexpr double kHighEnergyThreshold = 0.3; + +// Number of consecutive high-energy frames to send per pulse +// (helps survive WebRTC audio processing smoothing) +constexpr int kHighEnergyFramesPerPulse = 5; + +// ============================================================================= +// Audio Helper Functions +// ============================================================================= + +/// Calculate RMS energy of audio samples (normalized to [-1, 1] range) +static double calculateEnergy(const std::vector &samples) { + if (samples.empty()) + return 0.0; + double sum_squared = 0.0; + for (int16_t sample : samples) { + double normalized = static_cast(sample) / 32768.0; + sum_squared += normalized * normalized; + } + return std::sqrt(sum_squared / samples.size()); +} + +/// Generate a high-energy audio frame (sine wave at max amplitude) +static std::vector generateHighEnergyFrame(int samples_per_channel) { + std::vector data(samples_per_channel * kAudioChannels); + const double frequency = 1000.0; // 1kHz sine wave + const double amplitude = 30000.0; // Near max for int16 + for (int i = 0; i < samples_per_channel; ++i) { + double t = static_cast(i) / kAudioSampleRate; + int16_t sample = + static_cast(amplitude * std::sin(2.0 * M_PI * frequency * t)); + for (int ch = 0; ch < kAudioChannels; ++ch) { + data[i * kAudioChannels + ch] = sample; + } + } + return data; +} + +/// Generate a low-energy (silent) audio frame +static std::vector generateSilentFrame(int samples_per_channel) { + return std::vector(samples_per_channel * kAudioChannels, 0); +} + +// ============================================================================= +// Test Fixture +// ============================================================================= + +class LatencyMeasurementTest : public LiveKitTestBase {}; + +// ============================================================================= +// Test 1: Connection Time Measurement +// ============================================================================= +TEST_F(LatencyMeasurementTest, ConnectionTime) { + skipIfNotConfigured(); + + std::cout << "\n=== Connection Time Measurement Test ===" << std::endl; + std::cout << "Iterations: " << config_.test_iterations << std::endl; + + LatencyStats stats; + RoomOptions options; + options.auto_subscribe = true; + + for (int i = 0; i < config_.test_iterations; ++i) { + auto room = std::make_unique(); + + auto start = std::chrono::high_resolution_clock::now(); + bool connected = room->Connect(config_.url, config_.caller_token, options); + auto end = std::chrono::high_resolution_clock::now(); + + if (connected) { + double latency_ms = + std::chrono::duration(end - start).count(); + stats.addMeasurement(latency_ms); + std::cout << " Iteration " << (i + 1) << ": " << std::fixed + << std::setprecision(2) << latency_ms << " ms" << std::endl; + } else { + std::cout << " Iteration " << (i + 1) << ": FAILED to connect" + << std::endl; + } + + // Small delay between iterations to allow cleanup + std::this_thread::sleep_for(500ms); + } + + stats.printStats("Connection Time Statistics"); + + EXPECT_GT(stats.count(), 0) << "At least one connection should succeed"; +} + +// ============================================================================= +// Test 2: Audio Latency Measurement using Energy Detection +// ============================================================================= +class AudioLatencyDelegate : public RoomDelegate { +public: + void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override { + std::lock_guard lock(mutex_); + if (event.track && event.track->kind() == TrackKind::KIND_AUDIO) { + subscribed_audio_track_ = event.track; + track_cv_.notify_all(); + } + } + + std::shared_ptr waitForAudioTrack(std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + if (track_cv_.wait_for(lock, timeout, [this] { + return subscribed_audio_track_ != nullptr; + })) { + return subscribed_audio_track_; + } + return nullptr; + } + +private: + std::mutex mutex_; + std::condition_variable track_cv_; + std::shared_ptr subscribed_audio_track_; +}; + +TEST_F(LatencyMeasurementTest, AudioLatency) { + skipIfNotConfigured(); + + std::cout << "\n=== Audio Latency Measurement Test ===" << std::endl; + std::cout << "Using energy detection to measure audio round-trip latency" + << std::endl; + + // Create receiver room with delegate + auto receiver_room = std::make_unique(); + AudioLatencyDelegate receiver_delegate; + receiver_room->setDelegate(&receiver_delegate); + + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + std::cout << "Receiver connected as: " << receiver_identity << std::endl; + + // Create sender room (using caller_token) + auto sender_room = std::make_unique(); + bool sender_connected = + sender_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(sender_connected) << "Sender failed to connect"; + + std::string sender_identity = sender_room->localParticipant()->identity(); + std::cout << "Sender connected as: " << sender_identity << std::endl; + + // Wait for sender to be visible to receiver + ASSERT_TRUE(waitForParticipant(receiver_room.get(), sender_identity, 10s)) + << "Sender not visible to receiver"; + + // Create audio source in real-time mode (queue_size_ms = 0) + // We'll pace the frames ourselves to match real-time delivery + auto audio_source = + std::make_shared(kAudioSampleRate, kAudioChannels, 0); + auto audio_track = + LocalAudioTrack::createLocalAudioTrack("latency-test", audio_source); + + TrackPublishOptions publish_options; + auto publication = sender_room->localParticipant()->publishTrack( + audio_track, publish_options); + ASSERT_NE(publication, nullptr) << "Failed to publish audio track"; + + std::cout << "Audio track published, waiting for subscription..." + << std::endl; + + // Wait for receiver to subscribe to the audio track + auto subscribed_track = receiver_delegate.waitForAudioTrack(10s); + ASSERT_NE(subscribed_track, nullptr) + << "Receiver did not subscribe to audio track"; + + std::cout << "Audio track subscribed, creating audio stream..." << std::endl; + + // Create audio stream from the subscribed track + AudioStream::Options stream_options; + stream_options.capacity = 100; // Small buffer to reduce latency + auto audio_stream = AudioStream::fromTrack(subscribed_track, stream_options); + ASSERT_NE(audio_stream, nullptr) << "Failed to create audio stream"; + + // Statistics for latency measurements + LatencyStats stats; + std::atomic running{true}; + std::atomic last_high_energy_send_time_us{0}; + std::atomic waiting_for_echo{false}; + std::atomic missed_pulses{0}; + + // Timeout for waiting for echo (2 seconds) + constexpr uint64_t kEchoTimeoutUs = 2000000; + + // Receiver thread: detect high energy frames and calculate latency + std::thread receiver_thread([&]() { + AudioFrameEvent event; + while (running.load() && audio_stream->read(event)) { + double energy = calculateEnergy(event.frame.data()); + + if (waiting_for_echo.load() && energy > kHighEnergyThreshold) { + uint64_t receive_time_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t send_time_us = last_high_energy_send_time_us.load(); + + if (send_time_us > 0) { + double latency_ms = (receive_time_us - send_time_us) / 1000.0; + if (latency_ms > 0 && latency_ms < 5000) { // Sanity check + stats.addMeasurement(latency_ms); + std::cout << " Audio latency: " << std::fixed + << std::setprecision(2) << latency_ms << " ms" + << " (energy: " << std::setprecision(3) << energy << ")" + << std::endl; + } + waiting_for_echo.store(false); + } + } + } + }); + + // Sender thread: send audio frames in real-time (10ms audio every 10ms) + // Hijack periodic frames with high energy for latency measurement + std::thread sender_thread([&]() { + int frame_count = 0; + const int frames_between_pulses = + 100; // Send pulse every 100 frames (~1 second) + const int total_pulses = 10; + int pulses_sent = 0; + uint64_t pulse_send_time = 0; + int high_energy_frames_remaining = + 0; // Counter for consecutive high-energy frames + + // Use steady timing to maintain real-time pace + auto next_frame_time = std::chrono::steady_clock::now(); + const auto frame_duration = + std::chrono::milliseconds(kAudioFrameDurationMs); + + while (running.load() && pulses_sent < total_pulses) { + // Wait until it's time to send the next frame (real-time pacing) + std::this_thread::sleep_until(next_frame_time); + next_frame_time += frame_duration; + + std::vector frame_data; + + // Check for echo timeout + if (waiting_for_echo.load() && pulse_send_time > 0) { + uint64_t now_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (now_us - pulse_send_time > kEchoTimeoutUs) { + std::cout << " Echo timeout for pulse " << pulses_sent + << ", moving on..." << std::endl; + waiting_for_echo.store(false); + missed_pulses++; + pulse_send_time = 0; + high_energy_frames_remaining = 0; + } + } + + // Continue sending high-energy frames if we're in the middle of a pulse + if (high_energy_frames_remaining > 0) { + frame_data = generateHighEnergyFrame(kSamplesPerFrame); + high_energy_frames_remaining--; + } else if (frame_count % frames_between_pulses == 0 && + !waiting_for_echo.load()) { + // Start a new pulse - send multiple consecutive high-energy frames + frame_data = generateHighEnergyFrame(kSamplesPerFrame); + high_energy_frames_remaining = + kHighEnergyFramesPerPulse - 1; // -1 because we're sending one now + + pulse_send_time = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + last_high_energy_send_time_us.store(pulse_send_time); + waiting_for_echo.store(true); + pulses_sent++; + + std::cout << "Sent pulse " << pulses_sent << "/" << total_pulses << " (" + << kHighEnergyFramesPerPulse << " frames)" << std::endl; + } else { + // Send silence (but still real audio frames for proper timing) + frame_data = generateSilentFrame(kSamplesPerFrame); + } + + AudioFrame frame(std::move(frame_data), kAudioSampleRate, kAudioChannels, + kSamplesPerFrame); + + try { + audio_source->captureFrame(frame); + } catch (const std::exception &e) { + std::cerr << "Error capturing frame: " << e.what() << std::endl; + } + + frame_count++; + } + + // Wait a bit for last echo to arrive + std::this_thread::sleep_for(2s); + running.store(false); + }); + + // Wait for threads to complete + sender_thread.join(); + audio_stream->close(); + receiver_thread.join(); + + stats.printStats("Audio Latency Statistics"); + + if (missed_pulses > 0) { + std::cout << "Missed pulses (timeout): " << missed_pulses << std::endl; + } + + // Clean up + sender_room->localParticipant()->unpublishTrack(publication->sid()); + + EXPECT_GT(stats.count(), 0) + << "At least one audio latency measurement should be recorded"; +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index 5ad6a28..38f4226 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -14,160 +14,17 @@ * limitations under the License. */ -#include -#include -#include -#include +#include "../common/test_common.h" #include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include -#include namespace livekit { namespace test { -using namespace std::chrono_literals; - // Maximum RPC payload size (15KB) constexpr size_t kMaxRpcPayloadSize = 15 * 1024; -// Default stress test duration in seconds (can be overridden by env var) -constexpr int kDefaultStressDurationSeconds = 600; // 10 minutes - -// Test configuration from environment variables -struct RpcStressTestConfig { - std::string url; - std::string caller_token; - std::string receiver_token; - int duration_seconds; - int num_caller_threads; - bool available = false; - - static RpcStressTestConfig fromEnv() { - RpcStressTestConfig config; - const char *url = std::getenv("LIVEKIT_URL"); - const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); - const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); - const char *duration_env = std::getenv("RPC_STRESS_DURATION_SECONDS"); - const char *threads_env = std::getenv("RPC_STRESS_CALLER_THREADS"); - - if (url && caller_token && receiver_token) { - config.url = url; - config.caller_token = caller_token; - config.receiver_token = receiver_token; - config.available = true; - } - - config.duration_seconds = - duration_env ? std::atoi(duration_env) : kDefaultStressDurationSeconds; - config.num_caller_threads = threads_env ? std::atoi(threads_env) : 4; - - return config; - } -}; - -// Statistics collector -class StressTestStats { -public: - void recordCall(bool success, double latency_ms, size_t payload_size) { - std::lock_guard lock(mutex_); - total_calls_++; - if (success) { - successful_calls_++; - latencies_.push_back(latency_ms); - total_bytes_ += payload_size; - } else { - failed_calls_++; - } - } - - void recordError(const std::string &error_type) { - std::lock_guard lock(mutex_); - error_counts_[error_type]++; - } - - void printStats() const { - std::lock_guard lock(mutex_); - - std::cout << "\n========================================" << std::endl; - std::cout << " RPC Stress Test Statistics " << std::endl; - std::cout << "========================================" << std::endl; - std::cout << "Total calls: " << total_calls_ << std::endl; - std::cout << "Successful: " << successful_calls_ << std::endl; - std::cout << "Failed: " << failed_calls_ << std::endl; - std::cout << "Success rate: " << std::fixed << std::setprecision(2) - << (total_calls_ > 0 ? (100.0 * successful_calls_ / total_calls_) - : 0.0) - << "%" << std::endl; - std::cout << "Total bytes: " << total_bytes_ << " (" - << (total_bytes_ / (1024.0 * 1024.0)) << " MB)" << std::endl; - - if (!latencies_.empty()) { - std::vector sorted_latencies = latencies_; - std::sort(sorted_latencies.begin(), sorted_latencies.end()); - - double sum = std::accumulate(sorted_latencies.begin(), - sorted_latencies.end(), 0.0); - double avg = sum / sorted_latencies.size(); - double min = sorted_latencies.front(); - double max = sorted_latencies.back(); - double p50 = sorted_latencies[sorted_latencies.size() * 50 / 100]; - double p95 = sorted_latencies[sorted_latencies.size() * 95 / 100]; - double p99 = sorted_latencies[sorted_latencies.size() * 99 / 100]; - - std::cout << "\nLatency (ms):" << std::endl; - std::cout << " Min: " << min << std::endl; - std::cout << " Avg: " << avg << std::endl; - std::cout << " P50: " << p50 << std::endl; - std::cout << " P95: " << p95 << std::endl; - std::cout << " P99: " << p99 << std::endl; - std::cout << " Max: " << max << std::endl; - } - - if (!error_counts_.empty()) { - std::cout << "\nError breakdown:" << std::endl; - for (const auto &pair : error_counts_) { - std::cout << " " << pair.first << ": " << pair.second << std::endl; - } - } - - std::cout << "========================================\n" << std::endl; - } - - int totalCalls() const { - std::lock_guard lock(mutex_); - return total_calls_; - } - - int successfulCalls() const { - std::lock_guard lock(mutex_); - return successful_calls_; - } - - int failedCalls() const { - std::lock_guard lock(mutex_); - return failed_calls_; - } - -private: - mutable std::mutex mutex_; - int total_calls_ = 0; - int successful_calls_ = 0; - int failed_calls_ = 0; - size_t total_bytes_ = 0; - std::vector latencies_; - std::map error_counts_; -}; - // Path to test data file (relative to repo root) static const char *kTestDataFile = "data/rpc_test_data.txt"; @@ -285,40 +142,14 @@ std::string generateRandomPayload(size_t size) { return truncateUtf8AndPad(result, size); } -// Wait for a remote participant to appear -bool waitForParticipant(Room *room, const std::string &identity, - std::chrono::milliseconds timeout) { - auto start = std::chrono::steady_clock::now(); - while (std::chrono::steady_clock::now() - start < timeout) { - if (room->remoteParticipant(identity) != nullptr) { - return true; - } - std::this_thread::sleep_for(100ms); - } - return false; -} - -class RpcStressTest : public ::testing::Test { -protected: - void SetUp() override { - livekit::initialize(livekit::LogSink::kConsole); - config_ = RpcStressTestConfig::fromEnv(); - } - - void TearDown() override { livekit::shutdown(); } - - RpcStressTestConfig config_; -}; +class RpcStressTest : public LiveKitTestBase {}; // Long-running stress test with max payload sizes TEST_F(RpcStressTest, MaxPayloadStress) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; - } + skipIfNotConfigured(); std::cout << "\n=== RPC Max Payload Stress Test ===" << std::endl; - std::cout << "Duration: " << config_.duration_seconds << " seconds" + std::cout << "Duration: " << config_.stress_duration_seconds << " seconds" << std::endl; std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; std::cout << "Max payload size: " << kMaxRpcPayloadSize << " bytes (15KB)" @@ -373,7 +204,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { std::atomic running{true}; auto start_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::seconds(config_.duration_seconds); + auto duration = std::chrono::seconds(config_.stress_duration_seconds); // Create caller threads std::vector caller_threads; @@ -489,7 +320,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { } progress_thread.join(); - stats.printStats(); + stats.printStats("RPC Max Payload Stress Test"); // Verify results EXPECT_GT(stats.successfulCalls(), 0) << "No successful calls"; @@ -507,16 +338,13 @@ TEST_F(RpcStressTest, MaxPayloadStress) { // Small payload stress test - fits in single SCTP chunk (no fragmentation) // SCTP MTU is ~1200 bytes, so we use 1000 bytes to leave room for headers TEST_F(RpcStressTest, SmallPayloadStress) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; - } + skipIfNotConfigured(); // Small payload that fits in single SCTP chunk (no fragmentation overhead) constexpr size_t kSmallPayloadSize = 1000; std::cout << "\n=== RPC Small Payload Stress Test ===" << std::endl; - std::cout << "Duration: " << config_.duration_seconds << " seconds" + std::cout << "Duration: " << config_.stress_duration_seconds << " seconds" << std::endl; std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; std::cout << "Payload size: " << kSmallPayloadSize @@ -540,13 +368,12 @@ TEST_F(RpcStressTest, SmallPayloadStress) { std::atomic total_received{0}; - // Register RPC handler that echoes small payloads + // Register RPC handler receiver_room->localParticipant()->registerRpcMethod( "small-payload-stress", [&total_received]( const RpcInvocationData &data) -> std::optional { total_received++; - // Echo the payload back for round-trip verification return data.payload; }); @@ -565,24 +392,21 @@ TEST_F(RpcStressTest, SmallPayloadStress) { waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; - std::cout << "Both rooms connected. Starting small payload stress test..." - << std::endl; + std::cout << "Both rooms connected. Starting stress test..." << std::endl; StressTestStats stats; std::atomic running{true}; auto start_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::seconds(config_.duration_seconds); + auto duration = std::chrono::seconds(config_.stress_duration_seconds); // Create caller threads std::vector caller_threads; for (int t = 0; t < config_.num_caller_threads; ++t) { caller_threads.emplace_back([&, thread_id = t]() { while (running.load()) { - // Use small payload that fits in single SCTP chunk std::string payload = generateRandomPayload(kSmallPayloadSize); - // Calculate expected checksum size_t expected_checksum = 0; for (char c : payload) { expected_checksum += static_cast(c); @@ -599,7 +423,6 @@ TEST_F(RpcStressTest, SmallPayloadStress) { std::chrono::duration(call_end - call_start) .count(); - // Verify response by comparing checksum size_t response_checksum = 0; for (char c : response) { response_checksum += static_cast(c); @@ -624,6 +447,8 @@ TEST_F(RpcStressTest, SmallPayloadStress) { stats.recordError("timeout"); } else if (code == RpcError::ErrorCode::CONNECTION_TIMEOUT) { stats.recordError("connection_timeout"); + } else if (code == RpcError::ErrorCode::RECIPIENT_DISCONNECTED) { + stats.recordError("recipient_disconnected"); } else { stats.recordError("rpc_error_" + std::to_string(e.code())); } @@ -632,8 +457,8 @@ TEST_F(RpcStressTest, SmallPayloadStress) { stats.recordError("exception"); } - // Small delay between calls - std::this_thread::sleep_for(10ms); + // Minimal delay for small payloads + std::this_thread::sleep_for(5ms); } }); } @@ -675,7 +500,7 @@ TEST_F(RpcStressTest, SmallPayloadStress) { } progress_thread.join(); - stats.printStats(); + stats.printStats("RPC Small Payload Stress Test"); // Verify results EXPECT_GT(stats.successfulCalls(), 0) << "No successful calls"; @@ -691,620 +516,5 @@ TEST_F(RpcStressTest, SmallPayloadStress) { receiver_room.reset(); } -// Stress test with varying payload sizes -TEST_F(RpcStressTest, VaryingPayloadStress) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; - } - - std::cout << "\n=== RPC Varying Payload Stress Test ===" << std::endl; - std::cout << "Duration: " << config_.duration_seconds << " seconds" - << std::endl; - std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; - - auto receiver_room = std::make_unique(); - RoomOptions options; - options.auto_subscribe = true; - - bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; - - std::string receiver_identity = receiver_room->localParticipant()->identity(); - - std::atomic total_received{0}; - std::map> received_by_size; - std::mutex size_map_mutex; - - // Response sizes to use (varying) - // Note: Leave room for metadata prefix "request_size:response_size:checksum:" - // which is about 25 bytes max - constexpr size_t kMetadataOverhead = 30; - std::vector response_sizes = { - 100, // Small (no compression) - 1024, // 1KB (compression threshold) - 5 * 1024, // 5KB - 10 * 1024, // 10KB - kMaxRpcPayloadSize - kMetadataOverhead // Max minus metadata overhead - }; - - receiver_room->localParticipant()->registerRpcMethod( - "varying-payload-stress", - [&, response_sizes]( - const RpcInvocationData &data) -> std::optional { - total_received++; - size_t request_size = data.payload.size(); - - { - std::lock_guard lock(size_map_mutex); - received_by_size[request_size]++; - } - - // Generate a random response payload of varying size - static thread_local std::random_device rd; - static thread_local std::mt19937 gen(rd()); - std::uniform_int_distribution dis(0, response_sizes.size() - 1); - size_t response_size = response_sizes[dis(gen)]; - - std::string response_payload = generateRandomPayload(response_size); - - // Calculate checksum for verification - size_t checksum = 0; - for (char c : response_payload) { - checksum += static_cast(c); - } - - // Return format: "request_size:response_size:checksum:payload" - // This allows sender to verify both request was received and response - // is correct - return std::to_string(request_size) + ":" + - std::to_string(response_size) + ":" + std::to_string(checksum) + - ":" + response_payload; - }); - - auto caller_room = std::make_unique(); - bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; - - bool receiver_visible = - waitForParticipant(caller_room.get(), receiver_identity, 10s); - ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; - - StressTestStats stats; - std::atomic running{true}; - - // Payload sizes to test - std::vector payload_sizes = { - 100, // Small - 1024, // 1KB - 5 * 1024, // 5KB - 10 * 1024, // 10KB - kMaxRpcPayloadSize - 1, // Just under max - kMaxRpcPayloadSize // Max (15KB) - }; - - auto start_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::seconds(config_.duration_seconds); - - std::vector caller_threads; - for (int t = 0; t < config_.num_caller_threads; ++t) { - caller_threads.emplace_back([&, thread_id = t]() { - int call_count = 0; - while (running.load()) { - size_t payload_size = payload_sizes[call_count % payload_sizes.size()]; - std::string payload = generateRandomPayload(payload_size); - - auto call_start = std::chrono::high_resolution_clock::now(); - - try { - std::string response = caller_room->localParticipant()->performRpc( - receiver_identity, "varying-payload-stress", payload, 60.0); - - auto call_end = std::chrono::high_resolution_clock::now(); - double latency_ms = - std::chrono::duration(call_end - call_start) - .count(); - - // Parse response format: - // "request_size:response_size:checksum:payload" - bool valid = false; - size_t first_colon = response.find(':'); - size_t second_colon = response.find(':', first_colon + 1); - size_t third_colon = response.find(':', second_colon + 1); - - if (first_colon != std::string::npos && - second_colon != std::string::npos && - third_colon != std::string::npos) { - size_t recv_request_size = - std::stoull(response.substr(0, first_colon)); - size_t recv_response_size = std::stoull(response.substr( - first_colon + 1, second_colon - first_colon - 1)); - size_t recv_checksum = std::stoull(response.substr( - second_colon + 1, third_colon - second_colon - 1)); - std::string recv_payload = response.substr(third_colon + 1); - - // Calculate actual checksum of received payload - size_t actual_checksum = 0; - for (char c : recv_payload) { - actual_checksum += static_cast(c); - } - - // Verify all fields - if (recv_request_size == payload_size && - recv_response_size == recv_payload.size() && - recv_checksum == actual_checksum) { - valid = true; - } else { - std::cerr << "[VARYING MISMATCH] sent_size=" << payload_size - << " recv_request_size=" << recv_request_size - << " recv_response_size=" << recv_response_size - << " actual_payload_size=" << recv_payload.size() - << " recv_checksum=" << recv_checksum - << " actual_checksum=" << actual_checksum << std::endl; - } - } else { - std::cerr << "[VARYING PARSE ERROR] response format invalid" - << std::endl; - } - - if (valid) { - stats.recordCall(true, latency_ms, payload_size); - } else { - stats.recordCall(false, latency_ms, payload_size); - stats.recordError("verification_failed"); - } - } catch (const RpcError &e) { - stats.recordCall(false, 0, payload_size); - auto code = static_cast(e.code()); - std::cerr << "[RPC ERROR] code=" << e.code() << " message=\"" - << e.message() << "\"" - << " data=\"" << e.data() << "\"" << std::endl; - if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { - stats.recordError("timeout"); - } else { - stats.recordError("rpc_error"); - } - } catch (const std::exception &ex) { - stats.recordCall(false, 0, payload_size); - stats.recordError("exception"); - std::cerr << "[EXCEPTION] " << ex.what() << std::endl; - } - - call_count++; - std::this_thread::sleep_for(5ms); - } - }); - } - - // Progress reporting - std::thread progress_thread([&]() { - while (running.load()) { - std::this_thread::sleep_for(30s); - if (!running.load()) - break; - - auto elapsed = std::chrono::steady_clock::now() - start_time; - auto elapsed_seconds = - std::chrono::duration_cast(elapsed).count(); - - std::cout << "[" << elapsed_seconds << "s] Total: " << stats.totalCalls() - << " | Success: " << stats.successfulCalls() - << " | Failed: " << stats.failedCalls() << std::endl; - } - }); - - while (std::chrono::steady_clock::now() - start_time < duration) { - std::this_thread::sleep_for(1s); - } - - running.store(false); - - for (auto &t : caller_threads) { - t.join(); - } - progress_thread.join(); - - stats.printStats(); - - // Print breakdown by size - std::cout << "Received by payload size:" << std::endl; - { - std::lock_guard lock(size_map_mutex); - for (const auto &pair : received_by_size) { - std::cout << " " << pair.first << " bytes: " << pair.second.load() - << std::endl; - } - } - - EXPECT_GT(stats.successfulCalls(), 0); - double success_rate = - (stats.totalCalls() > 0) - ? (100.0 * stats.successfulCalls() / stats.totalCalls()) - : 0.0; - EXPECT_GT(success_rate, 95.0) << "Success rate below 95%"; - - receiver_room->localParticipant()->unregisterRpcMethod( - "varying-payload-stress"); - caller_room.reset(); - receiver_room.reset(); -} - -// Stress test for bidirectional RPC (both sides can call each other) -TEST_F(RpcStressTest, BidirectionalRpcStress) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; - } - - std::cout << "\n=== Bidirectional RPC Stress Test ===" << std::endl; - std::cout << "Duration: " << config_.duration_seconds << " seconds" - << std::endl; - - auto room_a = std::make_unique(); - auto room_b = std::make_unique(); - RoomOptions options; - options.auto_subscribe = true; - - bool a_connected = - room_a->Connect(config_.url, config_.caller_token, options); - ASSERT_TRUE(a_connected) << "Room A failed to connect"; - - bool b_connected = - room_b->Connect(config_.url, config_.receiver_token, options); - ASSERT_TRUE(b_connected) << "Room B failed to connect"; - - std::string identity_a = room_a->localParticipant()->identity(); - std::string identity_b = room_b->localParticipant()->identity(); - - ASSERT_TRUE(waitForParticipant(room_a.get(), identity_b, 10s)) - << "Room B not visible to Room A"; - ASSERT_TRUE(waitForParticipant(room_b.get(), identity_a, 10s)) - << "Room A not visible to Room B"; - - std::atomic a_received{0}; - std::atomic b_received{0}; - - // Register handlers on both sides - echo payload back for verification - room_a->localParticipant()->registerRpcMethod( - "ping", - [&a_received]( - const RpcInvocationData &data) -> std::optional { - a_received++; - // Echo the payload back for round-trip verification - return data.payload; - }); - - room_b->localParticipant()->registerRpcMethod( - "ping", - [&b_received]( - const RpcInvocationData &data) -> std::optional { - b_received++; - // Echo the payload back for round-trip verification - return data.payload; - }); - - StressTestStats stats_a_to_b; - StressTestStats stats_b_to_a; - std::atomic running{true}; - - auto start_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::seconds(config_.duration_seconds); - - // A calling B - std::thread thread_a_to_b([&]() { - int counter = 0; - while (running.load()) { - std::string payload = generateRandomPayload(kMaxRpcPayloadSize); - - // Calculate expected checksum for verification - size_t expected_checksum = 0; - for (char c : payload) { - expected_checksum += static_cast(c); - } - - auto call_start = std::chrono::high_resolution_clock::now(); - - try { - std::string response = room_a->localParticipant()->performRpc( - identity_b, "ping", payload, 60.0); - - auto call_end = std::chrono::high_resolution_clock::now(); - double latency_ms = - std::chrono::duration(call_end - call_start) - .count(); - - // Verify response by comparing checksum - size_t response_checksum = 0; - for (char c : response) { - response_checksum += static_cast(c); - } - - if (response.size() == payload.size() && - response_checksum == expected_checksum) { - stats_a_to_b.recordCall(true, latency_ms, kMaxRpcPayloadSize); - } else { - stats_a_to_b.recordCall(false, latency_ms, kMaxRpcPayloadSize); - std::cerr << "[A->B MISMATCH] sent size=" << payload.size() - << " checksum=" << expected_checksum - << " | received size=" << response.size() - << " checksum=" << response_checksum << std::endl; - } - } catch (const RpcError &e) { - stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[A->B RPC ERROR] code=" << e.code() << " message=\"" - << e.message() << "\"" - << " data=\"" << e.data() << "\"" << std::endl; - } catch (const std::exception &ex) { - stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[A->B EXCEPTION] " << ex.what() << std::endl; - } - - counter++; - std::this_thread::sleep_for(20ms); - } - }); - - // B calling A - std::thread thread_b_to_a([&]() { - int counter = 0; - while (running.load()) { - std::string payload = generateRandomPayload(kMaxRpcPayloadSize); - - // Calculate expected checksum for verification - size_t expected_checksum = 0; - for (char c : payload) { - expected_checksum += static_cast(c); - } - - auto call_start = std::chrono::high_resolution_clock::now(); - - try { - std::string response = room_b->localParticipant()->performRpc( - identity_a, "ping", payload, 60.0); - - auto call_end = std::chrono::high_resolution_clock::now(); - double latency_ms = - std::chrono::duration(call_end - call_start) - .count(); - - // Verify response by comparing checksum - size_t response_checksum = 0; - for (char c : response) { - response_checksum += static_cast(c); - } - - if (response.size() == payload.size() && - response_checksum == expected_checksum) { - stats_b_to_a.recordCall(true, latency_ms, kMaxRpcPayloadSize); - } else { - stats_b_to_a.recordCall(false, latency_ms, kMaxRpcPayloadSize); - std::cerr << "[B->A MISMATCH] sent size=" << payload.size() - << " checksum=" << expected_checksum - << " | received size=" << response.size() - << " checksum=" << response_checksum << std::endl; - } - } catch (const RpcError &e) { - stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[B->A RPC ERROR] code=" << e.code() << " message=\"" - << e.message() << "\"" - << " data=\"" << e.data() << "\"" << std::endl; - } catch (const std::exception &ex) { - stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[B->A EXCEPTION] " << ex.what() << std::endl; - } - - counter++; - std::this_thread::sleep_for(20ms); - } - }); - - // Progress - std::thread progress_thread([&]() { - while (running.load()) { - std::this_thread::sleep_for(30s); - if (!running.load()) - break; - - auto elapsed = std::chrono::steady_clock::now() - start_time; - auto elapsed_seconds = - std::chrono::duration_cast(elapsed).count(); - - std::cout << "[" << elapsed_seconds << "s] " - << "A->B: " << stats_a_to_b.successfulCalls() << "/" - << stats_a_to_b.totalCalls() << " | " - << "B->A: " << stats_b_to_a.successfulCalls() << "/" - << stats_b_to_a.totalCalls() << " | " - << "A rcvd: " << a_received.load() - << " | B rcvd: " << b_received.load() << std::endl; - } - }); - - while (std::chrono::steady_clock::now() - start_time < duration) { - std::this_thread::sleep_for(1s); - } - - running.store(false); - - thread_a_to_b.join(); - thread_b_to_a.join(); - progress_thread.join(); - - std::cout << "\n=== A -> B Statistics ===" << std::endl; - stats_a_to_b.printStats(); - - std::cout << "\n=== B -> A Statistics ===" << std::endl; - stats_b_to_a.printStats(); - - EXPECT_GT(stats_a_to_b.successfulCalls(), 0); - EXPECT_GT(stats_b_to_a.successfulCalls(), 0); - - room_a->localParticipant()->unregisterRpcMethod("ping"); - room_b->localParticipant()->unregisterRpcMethod("ping"); - room_a.reset(); - room_b.reset(); -} - -// High throughput stress test (short bursts) -TEST_F(RpcStressTest, HighThroughputBurst) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; - } - - std::cout << "\n=== High Throughput Burst Test ===" << std::endl; - std::cout << "Duration: " << config_.duration_seconds << " seconds" - << std::endl; - std::cout << "Testing rapid-fire RPC with max payload (15KB)..." << std::endl; - - auto receiver_room = std::make_unique(); - RoomOptions options; - options.auto_subscribe = true; - - bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; - - std::string receiver_identity = receiver_room->localParticipant()->identity(); - - std::atomic total_received{0}; - - receiver_room->localParticipant()->registerRpcMethod( - "burst-test", - [&total_received]( - const RpcInvocationData &data) -> std::optional { - total_received++; - // Echo the payload back for round-trip verification - return data.payload; - }); - - auto caller_room = std::make_unique(); - bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; - - bool receiver_visible = - waitForParticipant(caller_room.get(), receiver_identity, 10s); - ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; - - StressTestStats stats; - std::atomic running{true}; - - auto start_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::seconds(config_.duration_seconds); - - // Multiple threads sending as fast as possible - std::vector burst_threads; - for (int t = 0; t < config_.num_caller_threads * 2; ++t) { - burst_threads.emplace_back([&]() { - while (running.load()) { - std::string payload = generateRandomPayload(kMaxRpcPayloadSize); - - // Calculate expected checksum for verification - size_t expected_checksum = 0; - for (char c : payload) { - expected_checksum += static_cast(c); - } - - auto call_start = std::chrono::high_resolution_clock::now(); - - try { - std::string response = caller_room->localParticipant()->performRpc( - receiver_identity, "burst-test", payload, 60.0); - - auto call_end = std::chrono::high_resolution_clock::now(); - double latency_ms = - std::chrono::duration(call_end - call_start) - .count(); - - // Verify response by comparing checksum - size_t response_checksum = 0; - for (char c : response) { - response_checksum += static_cast(c); - } - - if (response.size() == payload.size() && - response_checksum == expected_checksum) { - stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); - } else { - stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); - std::cerr << "[BURST MISMATCH] sent size=" << payload.size() - << " checksum=" << expected_checksum - << " | received size=" << response.size() - << " checksum=" << response_checksum << std::endl; - } - } catch (const RpcError &e) { - stats.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[BURST RPC ERROR] code=" << e.code() << " message=\"" - << e.message() << "\"" - << " data=\"" << e.data() << "\"" << std::endl; - } catch (const std::exception &ex) { - stats.recordCall(false, 0, kMaxRpcPayloadSize); - std::cerr << "[BURST EXCEPTION] " << ex.what() << std::endl; - } - - // No delay - burst mode - } - }); - } - - // Progress - std::thread progress_thread([&]() { - int last_total = 0; - while (running.load()) { - std::this_thread::sleep_for(10s); - if (!running.load()) - break; - - int current = stats.totalCalls(); - double rate = (current - last_total) / 10.0; - last_total = current; - - auto elapsed = std::chrono::steady_clock::now() - start_time; - auto elapsed_seconds = - std::chrono::duration_cast(elapsed).count(); - - std::cout << "[" << elapsed_seconds << "s] " - << "Total: " << current - << " | Success: " << stats.successfulCalls() - << " | Rate: " << rate << " calls/sec" - << " | Throughput: " << (rate * kMaxRpcPayloadSize / 1024.0) - << " KB/sec" << std::endl; - } - }); - - while (std::chrono::steady_clock::now() - start_time < duration) { - std::this_thread::sleep_for(1s); - } - - running.store(false); - - for (auto &t : burst_threads) { - t.join(); - } - progress_thread.join(); - - stats.printStats(); - - auto total_time = std::chrono::duration_cast( - std::chrono::steady_clock::now() - start_time) - .count(); - double avg_rate = static_cast(stats.totalCalls()) / total_time; - double throughput_kbps = - (static_cast(stats.successfulCalls()) * kMaxRpcPayloadSize) / - (total_time * 1024.0); - - std::cout << "Average rate: " << avg_rate << " calls/sec" << std::endl; - std::cout << "Average throughput: " << throughput_kbps << " KB/sec" - << std::endl; - - EXPECT_GT(stats.successfulCalls(), 0); - - receiver_room->localParticipant()->unregisterRpcMethod("burst-test"); - caller_room.reset(); - receiver_room.reset(); -} - } // namespace test } // namespace livekit From 3ef8f77a3a000cba30dd35cbca444c937079cdf2 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Wed, 11 Feb 2026 19:30:21 -0800 Subject: [PATCH 2/3] try fixing the windows build --- vcpkg-configuration.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 1a167d0..66e9e02 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -3,6 +3,6 @@ "default-registry": { "kind": "git", "repository": "https://github.com/microsoft/vcpkg", - "baseline": "c82f74667287d3dc386bce81e44964370c91a289" + "baseline": "66c0373dc7fca549e5803087b9487edfe3aca0a1" } } From 402038633e8aab8afc3943ba51712f8633c71619 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 13 Feb 2026 14:57:10 -0800 Subject: [PATCH 3/3] fix windows CIs --- .github/workflows/builds.yml | 2 +- .github/workflows/make-release.yml | 2 +- vcpkg-configuration.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 766cec5..acf394c 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -78,7 +78,7 @@ jobs: if: runner.os == 'Windows' uses: lukka/run-vcpkg@v11 with: - vcpkgGitCommitId: 'c82f74667287d3dc386bce81e44964370c91a289' + vcpkgGitCommitId: 'fb87e2bb3fe69e16c224989acb5a61349166c782' # ---------- OS-specific deps ---------- - name: Install deps (Ubuntu) diff --git a/.github/workflows/make-release.yml b/.github/workflows/make-release.yml index 68ce7dc..b3edaec 100644 --- a/.github/workflows/make-release.yml +++ b/.github/workflows/make-release.yml @@ -91,7 +91,7 @@ jobs: if: runner.os == 'Windows' uses: lukka/run-vcpkg@v11 with: - vcpkgGitCommitId: 'c82f74667287d3dc386bce81e44964370c91a289' + vcpkgGitCommitId: 'fb87e2bb3fe69e16c224989acb5a61349166c782' # ---------- OS-specific deps ---------- - name: Install deps (Ubuntu) diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 66e9e02..dd51e03 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -3,6 +3,6 @@ "default-registry": { "kind": "git", "repository": "https://github.com/microsoft/vcpkg", - "baseline": "66c0373dc7fca549e5803087b9487edfe3aca0a1" + "baseline": "fb87e2bb3fe69e16c224989acb5a61349166c782" } }