diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3d36664..d79acb5 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -116,6 +116,19 @@ target_link_libraries(SimpleDataStream livekit ) +add_executable(PublishYuvSource + publish_yuv_source/main.cpp + publish_yuv_source/yuv_source.cpp + publish_yuv_source/yuv_source.h +) + +target_include_directories(PublishYuvSource PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS}) + +target_link_libraries(PublishYuvSource + PRIVATE + livekit +) + add_custom_command( TARGET SimpleDataStream POST_BUILD @@ -135,7 +148,7 @@ if(WIN32) ) # Copy DLLs to each example's output directory - foreach(EXAMPLE SimpleRoom SimpleRpc SimpleDataStream) + foreach(EXAMPLE SimpleRoom SimpleRpc SimpleDataStream PublishYuvSource) foreach(DLL ${REQUIRED_DLLS}) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different @@ -159,7 +172,7 @@ if(UNIX) endif() # Copy shared library to each example's output directory - foreach(EXAMPLE SimpleRoom SimpleRpc SimpleDataStream) + foreach(EXAMPLE SimpleRoom SimpleRpc SimpleDataStream PublishYuvSource) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "${LIVEKIT_LIB_DIR}/${FFI_SHARED_LIB}" diff --git a/examples/publish_yuv_source/README.md b/examples/publish_yuv_source/README.md new file mode 100644 index 0000000..43d90f6 --- /dev/null +++ b/examples/publish_yuv_source/README.md @@ -0,0 +1,11 @@ +### Generate yuv nv12 stream with gstreamer + +```bash +gst-launch-1.0 avfvideosrc device-index=0 ! videoconvert ! videorate ! videoscale ! video/x-raw,format=NV12,width=1280,height=720,framerate=30/1 ! queue ! tcpserversink host=0.0.0.0 port=5004 sync=false +``` + +### Publish stream to LiveKit + +```bash +./build-release/bin/PublishYuvSource --url wss://... --token --tcp 0.0.0.0:5004 --raw-width 1280 --raw-height 720 --raw-fps 30 +``` diff --git a/examples/publish_yuv_source/main.cpp b/examples/publish_yuv_source/main.cpp new file mode 100644 index 0000000..e36d446 --- /dev/null +++ b/examples/publish_yuv_source/main.cpp @@ -0,0 +1,283 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "livekit/livekit.h" +#include "yuv_source.h" + +using namespace livekit; + +namespace { + +std::atomic g_running{true}; + +void printUsage(const char *prog) { + std::cerr << "Usage: " << prog + << " --url --token --tcp [options]\n\n" + << " --url LiveKit WebSocket URL\n" + << " --token JWT token\n" + << " --enable_e2ee Enable E2EE\n" + << " --e2ee_key E2EE shared key\n\n" + << " --tcp TCP server for raw NV12 (default " + "127.0.0.1:5004)\n" + << " --raw-width Frame width (default: 1280)\n" + << " --raw-height Frame height (default: 720)\n" + << " --raw-fps Frame rate (default: 30)\n\n" + << "Env: LIVEKIT_URL, LIVEKIT_TOKEN, LIVEKIT_E2EE_KEY\n"; +} + +void handleSignal(int) { g_running.store(false); } + +struct RawNv12Args { + std::string host = "127.0.0.1"; + std::uint16_t port = 5004; + int width = 1280; + int height = 720; + int fps = 30; +}; + +bool parseArgs(int argc, char *argv[], std::string &url, std::string &token, + bool &enable_e2ee, std::string &e2ee_key, + RawNv12Args &raw_nv12) { + enable_e2ee = false; + raw_nv12 = RawNv12Args{}; + auto get_flag_value = [&](const std::string &name, int &i) -> std::string { + std::string arg = argv[i]; + const std::string eq = name + "="; + if (arg.rfind(name, 0) == 0) { + if (arg.size() > name.size() && arg[name.size()] == '=') + return arg.substr(eq.size()); + if (i + 1 < argc) + return std::string(argv[++i]); + } + return {}; + }; + + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "-h" || a == "--help") + return false; + if (a == "--enable_e2ee") { + enable_e2ee = true; + continue; + } + if (a.rfind("--tcp", 0) == 0) { + std::string v = get_flag_value("--tcp", i); + if (v.empty()) + v = "127.0.0.1:5004"; + size_t colon = v.find(':'); + if (colon != std::string::npos) { + raw_nv12.host = v.substr(0, colon); + try { + raw_nv12.port = + static_cast(std::stoul(v.substr(colon + 1))); + } catch (...) { + raw_nv12.port = 5004; + } + } else { + raw_nv12.host = v; + } + continue; + } + if (a.rfind("--raw-width", 0) == 0) { + std::string v = get_flag_value("--raw-width", i); + if (!v.empty()) + try { + raw_nv12.width = std::stoi(v); + } catch (...) { + } + continue; + } + if (a.rfind("--raw-height", 0) == 0) { + std::string v = get_flag_value("--raw-height", i); + if (!v.empty()) + try { + raw_nv12.height = std::stoi(v); + } catch (...) { + } + continue; + } + if (a.rfind("--raw-fps", 0) == 0) { + std::string v = get_flag_value("--raw-fps", i); + if (!v.empty()) + try { + raw_nv12.fps = std::stoi(v); + } catch (...) { + } + continue; + } + if (a.rfind("--url", 0) == 0) { + std::string v = get_flag_value("--url", i); + if (!v.empty()) + url = v; + continue; + } + if (a.rfind("--token", 0) == 0) { + std::string v = get_flag_value("--token", i); + if (!v.empty()) + token = v; + continue; + } + if (a.rfind("--e2ee_key", 0) == 0) { + std::string v = get_flag_value("--e2ee_key", i); + if (!v.empty()) + e2ee_key = v; + } + } + + if (url.empty()) { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + } + if (token.empty()) { + const char *e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (e2ee_key.empty()) { + const char *e = std::getenv("LIVEKIT_E2EE_KEY"); + if (e) + e2ee_key = e; + } + return !(url.empty() || token.empty()); +} + +class LoggingDelegate : public livekit::RoomDelegate { +public: + void onParticipantConnected( + livekit::Room &, const livekit::ParticipantConnectedEvent &ev) override { + std::cout << "[Room] participant connected: " << ev.participant->identity() + << "\n"; + } + void onTrackSubscribed(livekit::Room &, + const livekit::TrackSubscribedEvent &ev) override { + std::cout << "[Room] track subscribed: " + << (ev.publication ? ev.publication->name() : "?") << "\n"; + } +}; + +static std::vector toBytes(const std::string &s) { + return std::vector(s.begin(), s.end()); +} + +} // namespace + +int main(int argc, char *argv[]) { + std::string url, token, e2ee_key; + bool enable_e2ee = false; + RawNv12Args raw_nv12; + if (!parseArgs(argc, argv, url, token, enable_e2ee, e2ee_key, raw_nv12)) { + printUsage(argv[0]); + return 1; + } + if (url.empty() || token.empty()) { + std::cerr + << "LIVEKIT_URL and LIVEKIT_TOKEN (or --url/--token) are required\n"; + return 1; + } + + std::signal(SIGINT, handleSignal); + livekit::initialize(livekit::LogSink::kConsole); + + auto room = std::make_unique(); + LoggingDelegate delegate; + room->setDelegate(&delegate); + + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + if (enable_e2ee) { + livekit::E2EEOptions enc; + enc.encryption_type = livekit::EncryptionType::GCM; + if (!e2ee_key.empty()) + enc.key_provider_options.shared_key = toBytes(e2ee_key); + options.encryption = enc; + } + + if (!room->Connect(url, token, options)) { + std::cerr << "Failed to connect\n"; + livekit::shutdown(); + return 1; + } + std::cout << "Connected to room: " << room->room_info().name << "\n"; + + const int width = raw_nv12.width; + const int height = raw_nv12.height; + const std::size_t expected_size = static_cast(width) * + static_cast(height) * 3 / 2; + + auto videoSource = std::make_shared(width, height); + auto videoTrack = + LocalVideoTrack::createLocalVideoTrack("yuv_source", videoSource); + TrackPublishOptions videoOpts; + videoOpts.source = TrackSource::SOURCE_CAMERA; + videoOpts.dtx = false; + videoOpts.simulcast = true; + videoOpts.video_codec = static_cast(1); // H264 + + std::shared_ptr videoPub; + try { + videoPub = room->localParticipant()->publishTrack(videoTrack, videoOpts); + std::cout << "Published video track: SID=" << videoPub->sid() + << " name=" << videoPub->name() << "\n"; + } catch (const std::exception &e) { + std::cerr << "Failed to publish track: " << e.what() << "\n"; + livekit::shutdown(); + return 1; + } + + auto yuvSource = std::make_unique( + raw_nv12.host, raw_nv12.port, width, height, raw_nv12.fps, + [videoSource, expected_size, width, height](publish_yuv::YuvFrame frame) { + if (frame.data.size() != expected_size) { + std::cerr << "Raw NV12 frame size mismatch\n"; + return; + } + try { + VideoFrame vf(width, height, VideoBufferType::NV12, + std::move(frame.data)); + videoSource->captureFrame(vf, frame.timestamp_us, + VideoRotation::VIDEO_ROTATION_0); + } catch (const std::exception &e) { + std::cerr << "captureFrame: " << e.what() << "\n"; + } + }); + yuvSource->start(); + + while (g_running.load()) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + yuvSource->stop(); + room->setDelegate(nullptr); + if (videoPub) + room->localParticipant()->unpublishTrack(videoPub->sid()); + room.reset(); + livekit::shutdown(); + std::cout << "Exiting.\n"; + return 0; +} diff --git a/examples/publish_yuv_source/yuv_source.cpp b/examples/publish_yuv_source/yuv_source.cpp new file mode 100644 index 0000000..763c872 --- /dev/null +++ b/examples/publish_yuv_source/yuv_source.cpp @@ -0,0 +1,153 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "yuv_source.h" + +#include +#include + +#ifdef _WIN32 +#include +#include +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET socket_t; +#define INVALID_SOCKET_VALUE INVALID_SOCKET +#define close_socket closesocket +#else +#include +#include +#include +#include +typedef int socket_t; +#define INVALID_SOCKET_VALUE (-1) +#define close_socket close +#endif + +namespace publish_yuv { + +namespace { + +socket_t connectTcp(const std::string &host, std::uint16_t port) { +#ifdef _WIN32 + WSADATA wsa; + if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) + return INVALID_SOCKET_VALUE; +#endif + struct addrinfo hints = {}, *res = nullptr; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + std::string portStr = std::to_string(port); + if (getaddrinfo(host.c_str(), portStr.c_str(), &hints, &res) != 0) { + std::cerr << "YuvSource: getaddrinfo failed for " << host << ":" << port + << "\n"; + return INVALID_SOCKET_VALUE; + } + socket_t fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (fd == INVALID_SOCKET_VALUE) { + freeaddrinfo(res); + return INVALID_SOCKET_VALUE; + } + if (connect(fd, res->ai_addr, static_cast(res->ai_addrlen)) != 0) { + close_socket(fd); + freeaddrinfo(res); + return INVALID_SOCKET_VALUE; + } + freeaddrinfo(res); + return fd; +} + +} // namespace + +YuvSource::YuvSource(const std::string &host, std::uint16_t port, int width, + int height, int fps, YuvFrameCallback callback) + : host_(host), port_(port), width_(width), height_(height), fps_(fps), + callback_(std::move(callback)) { + if (width_ > 0 && height_ > 0) { + frame_size_ = static_cast(width_) * + static_cast(height_) * 3 / 2; + } +} + +YuvSource::~YuvSource() { stop(); } + +void YuvSource::start() { + if (running_.exchange(true)) + return; + thread_ = std::thread(&YuvSource::loop, this); +} + +void YuvSource::stop() { + running_.store(false); + if (thread_.joinable()) + thread_.join(); +} + +void YuvSource::loop() { + if (frame_size_ == 0) { + std::cerr << "YuvSource: invalid frame size\n"; + running_.store(false); + return; + } + + socket_t fd = connectTcp(host_, port_); + if (fd == INVALID_SOCKET_VALUE) { + std::cerr << "YuvSource: failed to connect to " << host_ << ":" << port_ + << "\n"; + running_.store(false); + return; + } + + std::cout << "YuvSource: connected to " << host_ << ":" << port_ << " (" + << width_ << "x" << height_ << "@" << fps_ + << "fps, frame=" << frame_size_ << " bytes)\n"; + + auto t0 = std::chrono::steady_clock::now(); + + while (running_.load()) { + std::vector frame(frame_size_); + std::size_t filled = 0; + while (filled < frame_size_ && running_.load()) { +#ifdef _WIN32 + int n = recv(fd, reinterpret_cast(frame.data() + filled), + static_cast(frame_size_ - filled), 0); +#else + ssize_t n = recv(fd, frame.data() + filled, frame_size_ - filled, 0); +#endif + if (n <= 0) { + running_.store(false); + break; + } + filled += static_cast(n); + } + if (!running_.load() || filled < frame_size_) + break; + + std::int64_t ts_us = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + if (callback_) { + YuvFrame out; + out.data = std::move(frame); + out.timestamp_us = ts_us; + callback_(std::move(out)); + } + } + + close_socket(fd); + running_.store(false); +} + +} // namespace publish_yuv diff --git a/examples/publish_yuv_source/yuv_source.h b/examples/publish_yuv_source/yuv_source.h new file mode 100644 index 0000000..62adc09 --- /dev/null +++ b/examples/publish_yuv_source/yuv_source.h @@ -0,0 +1,63 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace publish_yuv { + +struct YuvFrame { + std::vector data; + std::int64_t timestamp_us{0}; +}; + +using YuvFrameCallback = std::function; + +/** + * Reads raw NV12 frames from a TCP server (fixed-size frames). + * Runs a background thread; call stop() to disconnect. + */ +class YuvSource { +public: + YuvSource(const std::string &host, std::uint16_t port, int width, int height, + int fps, YuvFrameCallback callback); + ~YuvSource(); + + void start(); + void stop(); + bool running() const { return running_.load(); } + +private: + void loop(); + + std::string host_; + std::uint16_t port_; + int width_; + int height_; + int fps_; + std::size_t frame_size_{0}; + YuvFrameCallback callback_; + std::atomic running_{false}; + std::thread thread_; +}; + +} // namespace publish_yuv