diff --git a/cmd/lk/join.go b/cmd/lk/join.go index fb534f30..9a94b0b2 100644 --- a/cmd/lk/join.go +++ b/cmd/lk/join.go @@ -61,6 +61,11 @@ var ( "can be used multiple times to publish multiple files. " + "can publish from Unix or TCP socket using the format '://' or '://' respectively. Valid codecs are \"h264\", \"h265\", \"vp8\", \"opus\"", }, + &cli.BoolFlag{ + Name: "publish-user-timestamp", + Usage: "Parse H264 SEI for user timestamp and attach user timestamp trailer to each encoded frame", + Hidden: true, + }, &cli.FloatFlag{ Name: "fps", Usage: "if video files are published, indicates FPS of video", @@ -171,6 +176,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error { if cmd.StringSlice("publish") != nil { fps := cmd.Float("fps") h26xStreamingFormat := cmd.String("h26x-streaming-format") + publishUserTimestamp := cmd.Bool("publish-user-timestamp") for _, pub := range cmd.StringSlice("publish") { onPublishComplete := func(pub *lksdk.LocalTrackPublication) { if cmd.Bool("exit-after-publish") { @@ -182,7 +188,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error { _ = room.LocalParticipant.UnpublishTrack(pub.SID()) } } - if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handlePublish(room, pub, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil { return err } } @@ -196,6 +202,7 @@ func handlePublish(room *lksdk.Room, name string, fps float64, h26xStreamingFormat string, + publishUserTimestamp bool, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { if isSocketFormat(name) { @@ -203,9 +210,9 @@ func handlePublish(room *lksdk.Room, if err != nil { return err } - return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, onPublishComplete) + return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete) } - return publishFile(room, name, fps, h26xStreamingFormat, onPublishComplete) + return publishFile(room, name, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete) } func publishDemo(room *lksdk.Room) error { @@ -238,6 +245,7 @@ func publishFile(room *lksdk.Room, filename string, fps float64, h26xStreamingFormat string, + publishUserTimestamp bool, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { // Configure provider @@ -273,6 +281,9 @@ func publishFile(room *lksdk.Room, return fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat) } } + if publishUserTimestamp { + opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true)) + } // Create track and publish track, err := lksdk.NewLocalFileTrack(filename, opts...) @@ -325,6 +336,7 @@ func publishSocket(room *lksdk.Room, address string, fps float64, h26xStreamingFormat string, + publishUserTimestamp bool, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { var mime string @@ -348,7 +360,7 @@ func publishSocket(room *lksdk.Room, } // Publish to room - err = publishReader(room, sock, mime, fps, h26xStreamingFormat, onPublishComplete) + err = publishReader(room, sock, mime, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete) return err } @@ -357,6 +369,7 @@ func publishReader(room *lksdk.Room, mime string, fps float64, h26xStreamingFormat string, + publishUserTimestamp bool, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { // Configure provider @@ -385,6 +398,10 @@ func publishReader(room *lksdk.Room, } } + if publishUserTimestamp { + opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true)) + } + // Create track and publish track, err := lksdk.NewLocalReaderTrack(in, mime, opts...) if err != nil { @@ -442,7 +459,7 @@ func parseSimulcastURL(url string) (*simulcastURLParts, error) { } // createSimulcastVideoTrack creates a simulcast video track from a TCP or Unix socket H.264/H.265 streams -func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, onComplete func()) (*lksdk.LocalTrack, error) { +func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, publishUserTimestamp bool, onComplete func()) (*lksdk.LocalTrack, error) { conn, err := net.Dial(urlParts.network, urlParts.address) if err != nil { return nil, fmt.Errorf("failed to connect to %s://%s: %w", urlParts.network, urlParts.address, err) @@ -470,6 +487,10 @@ func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.Vide return nil, fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat) } + if publishUserTimestamp { + opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true)) + } + // Configure simulcast layer opts = append(opts, lksdk.ReaderTrackWithSampleOptions(lksdk.WithSimulcast("simulcast", &livekit.VideoLayer{ Quality: quality, @@ -493,7 +514,7 @@ type simulcastLayer struct { } // handleSimulcastPublish handles publishing multiple H.264 streams as a simulcast track -func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, onPublishComplete func(*lksdk.LocalTrackPublication)) error { +func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, publishUserTimestamp bool, onPublishComplete func(*lksdk.LocalTrackPublication)) error { // Parse all URLs var layers []simulcastLayer for _, url := range urls { @@ -564,7 +585,7 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt } for _, layer := range layers { - track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, signalCompletion) + track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, publishUserTimestamp, signalCompletion) if err != nil { // Clean up any tracks we've already created for _, t := range tracks { diff --git a/cmd/lk/room.go b/cmd/lk/room.go index 2f206ccd..34293338 100644 --- a/cmd/lk/room.go +++ b/cmd/lk/room.go @@ -177,6 +177,10 @@ var ( Usage: "Format to use when reading H.264 from file or socket, \"annex-b\" OR \"length-prefixed\"", Value: "annex-b", }, + &cli.BoolFlag{ + Name: "publish-user-timestamp", + Usage: "When publishing H.264/H.265, attach the LKTS user timestamp trailer to each encoded frame (timestamp is sourced from preceding H.264 SEI user_data_unregistered when present)", + }, &cli.BoolFlag{ Name: "exit-after-publish", Usage: "When publishing, exit after file or stream is complete", @@ -997,6 +1001,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { } exitAfterPublish := cmd.Bool("exit-after-publish") + publishUserTimestamp := cmd.Bool("publish-user-timestamp") // Handle publishing if len(publishUrls) > 0 { @@ -1015,7 +1020,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { } } - if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil { return err } } else { @@ -1033,7 +1038,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { _ = room.LocalParticipant.UnpublishTrack(pub.SID()) } } - if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handlePublish(room, pub, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil { return err } }