Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions cmd/lk/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ var (
"can be used multiple times to publish multiple files. " +
"can publish from Unix or TCP socket using the format '<codec>://<socket_name>' or '<codec>://<host:address>' respectively. Valid codecs are \"h264\", \"h265\", \"vp8\", \"opus\"",
},
&cli.BoolFlag{
Name: "publish-user-timestamp",
Usage: "Parse H264 SEI for user timestamp and attach user timestamp trailer to each encoded frame",
Hidden: true,
},
&cli.FloatFlag{
Name: "fps",
Usage: "if video files are published, indicates FPS of video",
Expand Down Expand Up @@ -171,6 +176,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error {
if cmd.StringSlice("publish") != nil {
fps := cmd.Float("fps")
h26xStreamingFormat := cmd.String("h26x-streaming-format")
publishUserTimestamp := cmd.Bool("publish-user-timestamp")
for _, pub := range cmd.StringSlice("publish") {
onPublishComplete := func(pub *lksdk.LocalTrackPublication) {
if cmd.Bool("exit-after-publish") {
Expand All @@ -182,7 +188,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error {
_ = room.LocalParticipant.UnpublishTrack(pub.SID())
}
}
if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handlePublish(room, pub, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil {
return err
}
}
Expand All @@ -196,16 +202,17 @@ func handlePublish(room *lksdk.Room,
name string,
fps float64,
h26xStreamingFormat string,
publishUserTimestamp bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
if isSocketFormat(name) {
mimeType, socketType, address, err := parseSocketFromName(name)
if err != nil {
return err
}
return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, onPublishComplete)
return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete)
}
return publishFile(room, name, fps, h26xStreamingFormat, onPublishComplete)
return publishFile(room, name, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete)
}

func publishDemo(room *lksdk.Room) error {
Expand Down Expand Up @@ -238,6 +245,7 @@ func publishFile(room *lksdk.Room,
filename string,
fps float64,
h26xStreamingFormat string,
publishUserTimestamp bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
// Configure provider
Expand Down Expand Up @@ -273,6 +281,9 @@ func publishFile(room *lksdk.Room,
return fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat)
}
}
if publishUserTimestamp {
opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true))
}

// Create track and publish
track, err := lksdk.NewLocalFileTrack(filename, opts...)
Expand Down Expand Up @@ -325,6 +336,7 @@ func publishSocket(room *lksdk.Room,
address string,
fps float64,
h26xStreamingFormat string,
publishUserTimestamp bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
var mime string
Expand All @@ -348,7 +360,7 @@ func publishSocket(room *lksdk.Room,
}

// Publish to room
err = publishReader(room, sock, mime, fps, h26xStreamingFormat, onPublishComplete)
err = publishReader(room, sock, mime, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete)
return err
}

Expand All @@ -357,6 +369,7 @@ func publishReader(room *lksdk.Room,
mime string,
fps float64,
h26xStreamingFormat string,
publishUserTimestamp bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
// Configure provider
Expand Down Expand Up @@ -385,6 +398,10 @@ func publishReader(room *lksdk.Room,
}
}

if publishUserTimestamp {
opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true))
}

// Create track and publish
track, err := lksdk.NewLocalReaderTrack(in, mime, opts...)
if err != nil {
Expand Down Expand Up @@ -442,7 +459,7 @@ func parseSimulcastURL(url string) (*simulcastURLParts, error) {
}

// createSimulcastVideoTrack creates a simulcast video track from a TCP or Unix socket H.264/H.265 streams
func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, onComplete func()) (*lksdk.LocalTrack, error) {
func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, publishUserTimestamp bool, onComplete func()) (*lksdk.LocalTrack, error) {
conn, err := net.Dial(urlParts.network, urlParts.address)
if err != nil {
return nil, fmt.Errorf("failed to connect to %s://%s: %w", urlParts.network, urlParts.address, err)
Expand Down Expand Up @@ -470,6 +487,10 @@ func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.Vide
return nil, fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat)
}

if publishUserTimestamp {
opts = append(opts, lksdk.ReaderTrackWithUserTimestamp(true))
}

// Configure simulcast layer
opts = append(opts, lksdk.ReaderTrackWithSampleOptions(lksdk.WithSimulcast("simulcast", &livekit.VideoLayer{
Quality: quality,
Expand All @@ -493,7 +514,7 @@ type simulcastLayer struct {
}

// handleSimulcastPublish handles publishing multiple H.264 streams as a simulcast track
func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, onPublishComplete func(*lksdk.LocalTrackPublication)) error {
func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, publishUserTimestamp bool, onPublishComplete func(*lksdk.LocalTrackPublication)) error {
// Parse all URLs
var layers []simulcastLayer
for _, url := range urls {
Expand Down Expand Up @@ -564,7 +585,7 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt
}

for _, layer := range layers {
track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, signalCompletion)
track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, publishUserTimestamp, signalCompletion)
if err != nil {
// Clean up any tracks we've already created
for _, t := range tracks {
Expand Down
9 changes: 7 additions & 2 deletions cmd/lk/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ var (
Usage: "Format to use when reading H.264 from file or socket, \"annex-b\" OR \"length-prefixed\"",
Value: "annex-b",
},
&cli.BoolFlag{
Name: "publish-user-timestamp",
Usage: "When publishing H.264/H.265, attach the LKTS user timestamp trailer to each encoded frame (timestamp is sourced from preceding H.264 SEI user_data_unregistered when present)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mix of H.264/H.265 mention here. Mentions only H.264 SEI in details. Would be good to add H.265 details also.

Also, in join.go flag ☝️ , it only mentions H.264.

Also, join.go commands are deprecated. Would be good to not add new ones there.

},
&cli.BoolFlag{
Name: "exit-after-publish",
Usage: "When publishing, exit after file or stream is complete",
Expand Down Expand Up @@ -997,6 +1001,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
}

exitAfterPublish := cmd.Bool("exit-after-publish")
publishUserTimestamp := cmd.Bool("publish-user-timestamp")

// Handle publishing
if len(publishUrls) > 0 {
Expand All @@ -1015,7 +1020,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
}
}

if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil {
return err
}
} else {
Expand All @@ -1033,7 +1038,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
_ = room.LocalParticipant.UnpublishTrack(pub.SID())
}
}
if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handlePublish(room, pub, fps, h26xStreamingFormat, publishUserTimestamp, onPublishComplete); err != nil {
return err
}
}
Expand Down