From 6be981cc9d81109ebcc75b8bc963b14026785996 Mon Sep 17 00:00:00 2001 From: zesun Date: Wed, 20 Aug 2025 22:54:22 +0800 Subject: [PATCH 1/6] Add stress-test benchmark to connect a livekit server for multiple rooms --- cmd/lk/perf.go | 162 +++++++++++++++++++++++++++++++++++ pkg/loadtester/loadtest.go | 121 +++++++++++++++++++++++++- pkg/loadtester/loadtester.go | 9 +- 3 files changed, 287 insertions(+), 5 deletions(-) diff --git a/cmd/lk/perf.go b/cmd/lk/perf.go index 35828f28..40f7a475 100644 --- a/cmd/lk/perf.go +++ b/cmd/lk/perf.go @@ -84,6 +84,70 @@ var ( }, }, }, + { + Name: "stress-test", + Usage: "Run stress tests against LiveKit with simulated publishers & subscribers", + Action: stressTest, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "rooms", + Usage: "`NUMBER` of rooms to open", + }, + &cli.StringFlag{ + Name: "room-prefix", + Usage: "Room `PREFIX` of tester participants (defaults to a random prefix)", + }, + &cli.DurationFlag{ + Name: "duration", + Usage: "`TIME` duration to run, 1m, 1h (by default will run until canceled)", + Value: 0, + }, + &cli.IntFlag{ + Name: "video-publishers", + Aliases: []string{"publishers"}, + Usage: "`NUMBER` of participants that would publish video tracks", + }, + &cli.IntFlag{ + Name: "audio-publishers", + Usage: "`NUMBER` of participants that would publish audio tracks", + }, + &cli.IntFlag{ + Name: "subscribers", + Usage: "`NUMBER` of participants that would subscribe to tracks", + }, + &cli.StringFlag{ + Name: "identity-prefix", + Usage: "Identity `PREFIX` of tester participants (defaults to a random prefix)", + }, + &cli.StringFlag{ + Name: "video-resolution", + Usage: "Resolution `QUALITY` of video to publish (\"high\", \"medium\", or \"low\")", + Value: "high", + }, + &cli.StringFlag{ + Name: "video-codec", + Usage: "`CODEC` \"h264\" or \"vp8\", both will be used when unset", + }, + &cli.FloatFlag{ + Name: "num-per-second", + Usage: "`NUMBER` of testers to start every second", + Value: 5, + }, + &cli.StringFlag{ + Name: "layout", + Usage: "`LAYOUT` to simulate, choose from \"speaker\", \"3x3\", \"4x4\", \"5x5\"", + Value: "speaker", + }, + &cli.BoolFlag{ + Name: "no-simulcast", + Usage: "Disables simulcast publishing (simulcast is enabled by default)", + }, + &cli.BoolFlag{ + Name: "simulate-speakers", + Usage: "Fire random speaker events to simulate speaker changes", + }, + }, + }, { Name: "agent-load-test", Usage: "Run load tests for a running agent", @@ -177,6 +241,70 @@ var ( }, }, }, + { + Name: "stress-test", + Usage: "Run stress tests against LiveKit with simulated publishers & subscribers", + Action: stressTest, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "rooms", + Usage: "`NUMBER` of rooms to open", + }, + &cli.StringFlag{ + Name: "room-prefix", + Usage: "Room `PREFIX` of tester participants (defaults to a random prefix)", + }, + &cli.DurationFlag{ + Name: "duration", + Usage: "`TIME` duration to run, 1m, 1h (by default will run until canceled)", + Value: 0, + }, + &cli.IntFlag{ + Name: "video-publishers", + Aliases: []string{"publishers"}, + Usage: "`NUMBER` of participants that would publish video tracks", + }, + &cli.IntFlag{ + Name: "audio-publishers", + Usage: "`NUMBER` of participants that would publish audio tracks", + }, + &cli.IntFlag{ + Name: "subscribers", + Usage: "`NUMBER` of participants that would subscribe to tracks", + }, + &cli.StringFlag{ + Name: "identity-prefix", + Usage: "Identity `PREFIX` of tester participants (defaults to a random prefix)", + }, + &cli.StringFlag{ + Name: "video-resolution", + Usage: "Resolution `QUALITY` of video to publish (\"high\", \"medium\", or \"low\")", + Value: "high", + }, + &cli.StringFlag{ + Name: "video-codec", + Usage: "`CODEC` \"h264\" or \"vp8\", both will be used when unset", + }, + &cli.FloatFlag{ + Name: "num-per-second", + Usage: "`NUMBER` of testers to start every second", + Value: 5, + }, + &cli.StringFlag{ + Name: "layout", + Usage: "`LAYOUT` to simulate, choose from \"speaker\", \"3x3\", \"4x4\", \"5x5\"", + Value: "speaker", + }, + &cli.BoolFlag{ + Name: "no-simulcast", + Usage: "Disables simulcast publishing (simulcast is enabled by default)", + }, + &cli.BoolFlag{ + Name: "simulate-speakers", + Usage: "Fire random speaker events to simulate speaker changes", + }, + }, + }, } ) @@ -225,6 +353,40 @@ func loadTest(ctx context.Context, cmd *cli.Command) error { return test.Run(ctx) } +func stressTest(ctx context.Context, cmd *cli.Command) error { + pc, err := loadProjectDetails(cmd) + if err != nil { + return err + } + if !cmd.Bool("verbose") { + lksdk.SetLogger(logger.LogRLogger(logr.Discard())) + } + _ = raiseULimit() + + params := loadtester.Params{ + VideoResolution: cmd.String("video-resolution"), + VideoCodec: cmd.String("video-codec"), + Duration: cmd.Duration("duration"), + NumPerSecond: cmd.Float("num-per-second"), + Simulcast: !cmd.Bool("no-simulcast"), + SimulateSpeakers: cmd.Bool("simulate-speakers"), + TesterParams: loadtester.TesterParams{ + URL: pc.URL, + APIKey: pc.APIKey, + APISecret: pc.APISecret, + IdentityPrefix: cmd.String("identity-prefix"), + Layout: loadtester.LayoutFromString(cmd.String("layout")), + }, + } + params.Rooms = int(cmd.Int("rooms")) + params.RoomPrefix = cmd.String("room-prefix") + params.VideoPublishers = int(cmd.Int("video-publishers")) + params.AudioPublishers = int(cmd.Int("audio-publishers")) + params.Subscribers = int(cmd.Int("subscribers")) + test := loadtester.NewLoadTest(params) + return test.RunStress(ctx) +} + func agentLoadTest(ctx context.Context, cmd *cli.Command) error { pc, err := loadProjectDetails(cmd) if err != nil { diff --git a/pkg/loadtester/loadtest.go b/pkg/loadtester/loadtest.go index a17ae98a..3a0e7a38 100644 --- a/pkg/loadtester/loadtest.go +++ b/pkg/loadtester/loadtest.go @@ -41,6 +41,7 @@ type LoadTest struct { } type Params struct { + Rooms int VideoPublishers int AudioPublishers int Subscribers int @@ -271,6 +272,124 @@ func (t *LoadTest) RunSuite(ctx context.Context) error { return nil } +func (t *LoadTest) RunStress(ctx context.Context) error { + parsedUrl, err := url.Parse(t.Params.URL) + if err != nil { + return err + } + if strings.HasSuffix(parsedUrl.Hostname(), ".livekit.cloud") { + if t.Params.VideoPublishers > 50 || t.Params.Subscribers > 50 || t.Params.AudioPublishers > 50 { + return errors.New("Unable to perform load test on LiveKit Cloud. Load testing is prohibited by our acceptable use policy: https://livekit.io/legal/acceptable-use-policy") + } + } + + if t.Params.RoomPrefix == "" { + t.Params.RoomPrefix = randStringRunes(5) + } + + roomsStats := make(map[string]map[string]*testerStats) + roomsStatsMutex := sync.Mutex{} + + wg := sync.WaitGroup{} + wg.Add(t.Params.Rooms) + for i := 0; i < t.Params.Rooms; i++ { + roomName := fmt.Sprintf("%s_%d", t.Params.RoomPrefix, i) + params := t.Params + params.Room = roomName + + go func() { + defer wg.Done() + stats, err := t.run(ctx, params) + if err != nil { + return + } + roomsStatsMutex.Lock() + defer roomsStatsMutex.Unlock() + roomsStats[roomName] = stats + }() + } + wg.Wait() + + var totals summary + var summariesLen int64 + + summaryTable := util.CreateTable(). + Headers("Room", "Tracks", "Bitrate", "Total Pkt. Loss", "Error"). + StyleFunc(func(row, col int) lipgloss.Style { + if row == table.HeaderRow { + return util.FormHeaderStyle + } + return util.FormBaseStyle + }) + + for roomName, stats := range roomsStats { + // tester results + summaries := make(map[string]*summary) + names := make([]string, 0, len(stats)) + for name := range stats { + if strings.HasPrefix(name, "Pub") { + continue + } + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + testerStats := stats[name] + summaries[name] = getTesterSummary(testerStats) + trackStatsSlice := make([]*trackStats, 0, len(testerStats.trackStats)) + for _, ts := range testerStats.trackStats { + trackStatsSlice = append(trackStatsSlice, ts) + } + sort.Slice(trackStatsSlice, func(i, j int) bool { + return strings.Compare( + string(trackStatsSlice[i].kind), + string(trackStatsSlice[j].kind), + ) < 0 + }) + } + + if len(summaries) == 0 { + continue + } + { + // totals row + s := getTestSummary(summaries) + sDropped := formatLossRate(s.packets, s.dropped) + // avg bitrate per sub + sBitrate := fmt.Sprintf("%s (%s avg)", + formatBitrate(s.bytes, s.elapsed), + formatBitrate(s.bytes/int64(len(summaries)), s.elapsed), + ) + summaryTable.Row(roomName, fmt.Sprintf("%d/%d", s.tracks, s.expected), sBitrate, sDropped, string(s.errCount)) + + totals.tracks += s.tracks + totals.expected += s.expected + totals.packets += s.packets + totals.bytes += s.bytes + totals.dropped += s.dropped + totals.elapsed += s.elapsed + totals.errCount += s.errCount + summariesLen += int64(len(summaries)) + } + } + + { + sDropped := formatLossRate(totals.packets, totals.dropped) + sBitrate := fmt.Sprintf("%s (%s avg)", + formatBitrate(totals.bytes*int64(len(roomsStats)), totals.elapsed), + formatBitrate((totals.bytes/int64(len(roomsStats))), totals.elapsed), + ) + + summaryTable.Row("Total", fmt.Sprintf("%d/%d", totals.tracks, totals.expected), sBitrate, sDropped, string(totals.errCount)) + } + + fmt.Println("\nRoom summaries:") + fmt.Println(summaryTable) + + return nil +} + func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerStats, error) { if params.Room == "" { params.Room = fmt.Sprintf("testroom%d", rand.Int31n(1000)) @@ -387,7 +506,7 @@ func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerSt // a really long time duration = 1000 * time.Hour } - fmt.Printf("Finished connecting to room, waiting %s\n", duration.String()) + fmt.Printf("Finished connecting to room %s, waiting %s\n", params.Room, duration.String()) select { case <-ctx.Done(): diff --git a/pkg/loadtester/loadtester.go b/pkg/loadtester/loadtester.go index 95fdaa06..fc4f23fe 100644 --- a/pkg/loadtester/loadtester.go +++ b/pkg/loadtester/loadtester.go @@ -79,6 +79,7 @@ type TesterParams struct { APIKey string APISecret string Room string + RoomPrefix string IdentityPrefix string Layout Layout // true to subscribe to all published tracks @@ -152,7 +153,7 @@ func (t *LoadTester) PublishAudioTrack(name string) (string, error) { return "", nil } - fmt.Println("publishing audio track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room ", t.room.Name(), "audio track -", t.room.LocalParticipant.Identity()) audioLooper, err := provider2.CreateAudioLooper() if err != nil { return "", err @@ -179,7 +180,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, return "", nil } - fmt.Println("publishing video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room ", t.room.Name(), " video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, false) if err != nil { return "", err @@ -204,7 +205,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, func (t *LoadTester) PublishSimulcastTrack(name, resolution, codec string) (string, error) { var tracks []*lksdk.LocalTrack - fmt.Println("publishing simulcast video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room ", t.room.Name(), " simulcast video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, true) if err != nil { return "", err @@ -317,7 +318,7 @@ func (t *LoadTester) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Rem kind: pub.Kind(), } t.stats.Store(track.ID(), s) - fmt.Println("subscribed to track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal)) + fmt.Println("subscribed to room ", t.room.Name(), " - track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal)) // consume track go t.consumeTrack(track, pub, rp) From 944edbc9f607bc413a43b2de5539edb900760882 Mon Sep 17 00:00:00 2001 From: zesun Date: Wed, 20 Aug 2025 23:21:48 +0800 Subject: [PATCH 2/6] Fixed calculation errors --- pkg/loadtester/loadtest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loadtester/loadtest.go b/pkg/loadtester/loadtest.go index 3a0e7a38..5ba246f5 100644 --- a/pkg/loadtester/loadtest.go +++ b/pkg/loadtester/loadtest.go @@ -378,7 +378,7 @@ func (t *LoadTest) RunStress(ctx context.Context) error { sDropped := formatLossRate(totals.packets, totals.dropped) sBitrate := fmt.Sprintf("%s (%s avg)", formatBitrate(totals.bytes*int64(len(roomsStats)), totals.elapsed), - formatBitrate((totals.bytes/int64(len(roomsStats))), totals.elapsed), + formatBitrate((totals.bytes*int64(len(roomsStats))/summariesLen), totals.elapsed), ) summaryTable.Row("Total", fmt.Sprintf("%d/%d", totals.tracks, totals.expected), sBitrate, sDropped, string(totals.errCount)) From bf7a9f3a0de41e63f6acea6ec1717f2866dadbee Mon Sep 17 00:00:00 2001 From: zesun Date: Wed, 20 Aug 2025 23:35:08 +0800 Subject: [PATCH 3/6] fix Printl format --- pkg/loadtester/loadtester.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/loadtester/loadtester.go b/pkg/loadtester/loadtester.go index fc4f23fe..4b8d6332 100644 --- a/pkg/loadtester/loadtester.go +++ b/pkg/loadtester/loadtester.go @@ -153,7 +153,7 @@ func (t *LoadTester) PublishAudioTrack(name string) (string, error) { return "", nil } - fmt.Println("publishing room ", t.room.Name(), "audio track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room", t.room.Name(), "audio track -", t.room.LocalParticipant.Identity()) audioLooper, err := provider2.CreateAudioLooper() if err != nil { return "", err @@ -180,7 +180,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, return "", nil } - fmt.Println("publishing room ", t.room.Name(), " video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room", t.room.Name(), " video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, false) if err != nil { return "", err @@ -205,7 +205,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, func (t *LoadTester) PublishSimulcastTrack(name, resolution, codec string) (string, error) { var tracks []*lksdk.LocalTrack - fmt.Println("publishing room ", t.room.Name(), " simulcast video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room", t.room.Name(), " simulcast video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, true) if err != nil { return "", err From f6d8a6b809b1f2e735a4a8c048745df9201d1c84 Mon Sep 17 00:00:00 2001 From: zesun Date: Wed, 20 Aug 2025 23:36:35 +0800 Subject: [PATCH 4/6] fix Printl format 2 --- pkg/loadtester/loadtester.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loadtester/loadtester.go b/pkg/loadtester/loadtester.go index 4b8d6332..d70b1015 100644 --- a/pkg/loadtester/loadtester.go +++ b/pkg/loadtester/loadtester.go @@ -180,7 +180,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, return "", nil } - fmt.Println("publishing room", t.room.Name(), " video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room", t.room.Name(), "video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, false) if err != nil { return "", err @@ -205,7 +205,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string, func (t *LoadTester) PublishSimulcastTrack(name, resolution, codec string) (string, error) { var tracks []*lksdk.LocalTrack - fmt.Println("publishing room", t.room.Name(), " simulcast video track -", t.room.LocalParticipant.Identity()) + fmt.Println("publishing room", t.room.Name(), "simulcast video track -", t.room.LocalParticipant.Identity()) loopers, err := provider2.CreateVideoLoopers(resolution, codec, true) if err != nil { return "", err From 8580669a4d5e87b80bda6c54cadbadfc01af4e5c Mon Sep 17 00:00:00 2001 From: zesun Date: Wed, 20 Aug 2025 23:43:53 +0800 Subject: [PATCH 5/6] fix Println format --- pkg/loadtester/loadtester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loadtester/loadtester.go b/pkg/loadtester/loadtester.go index d70b1015..9ebf3154 100644 --- a/pkg/loadtester/loadtester.go +++ b/pkg/loadtester/loadtester.go @@ -318,7 +318,7 @@ func (t *LoadTester) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Rem kind: pub.Kind(), } t.stats.Store(track.ID(), s) - fmt.Println("subscribed to room ", t.room.Name(), " - track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal)) + fmt.Println("subscribed to room", t.room.Name(), "- track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal)) // consume track go t.consumeTrack(track, pub, rp) From 588c05d2a77d85295ec05e37be4abc0bdf2e2fcd Mon Sep 17 00:00:00 2001 From: zesun Date: Thu, 21 Aug 2025 21:45:56 +0800 Subject: [PATCH 6/6] fix error conversion from int64 to string --- pkg/loadtester/loadtest.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loadtester/loadtest.go b/pkg/loadtester/loadtest.go index 5ba246f5..9d5477d9 100644 --- a/pkg/loadtester/loadtest.go +++ b/pkg/loadtester/loadtest.go @@ -361,7 +361,7 @@ func (t *LoadTest) RunStress(ctx context.Context) error { formatBitrate(s.bytes, s.elapsed), formatBitrate(s.bytes/int64(len(summaries)), s.elapsed), ) - summaryTable.Row(roomName, fmt.Sprintf("%d/%d", s.tracks, s.expected), sBitrate, sDropped, string(s.errCount)) + summaryTable.Row(roomName, fmt.Sprintf("%d/%d", s.tracks, s.expected), sBitrate, sDropped, strconv.FormatInt(s.errCount, 10)) totals.tracks += s.tracks totals.expected += s.expected @@ -381,7 +381,7 @@ func (t *LoadTest) RunStress(ctx context.Context) error { formatBitrate((totals.bytes*int64(len(roomsStats))/summariesLen), totals.elapsed), ) - summaryTable.Row("Total", fmt.Sprintf("%d/%d", totals.tracks, totals.expected), sBitrate, sDropped, string(totals.errCount)) + summaryTable.Row("Total", fmt.Sprintf("%d/%d", totals.tracks, totals.expected), sBitrate, sDropped, strconv.FormatInt(totals.errCount, 10)) } fmt.Println("\nRoom summaries:")